0


kafka学习笔记

介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等。

使用场景

日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

消息系统:解耦和生产者和消费者、缓存消息等。

用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

重要术语

名称解释broker
消息中间件处理节点,一个Kafka节点就是

一个broker,一个或者多个Broker可以组

成一个Kafka集群

Topic

Kafka根据topic对消息进行归类,发布到

Kafka集群的每条消息都需要指定一个topic

Producer

消息生产者,向Broker发送消息的客户端

Consumer
消息生产者,向Broker发送消息的客户端ConsumerGroup

每个Consumer属于一个特定的ConsumerGroup,一条消息可以被多个不同的ConsumerGroup消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息

Partition

物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的

整体上,producer通过网络发送消息到Kafka集群,然后consumer来进行消费。

Topic

Topic是一个类别的名称,同类消息发送到同一个Topic下面。对于每一个Topic,下面可以有多个分区(Partition)日志文件

Producer

生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round­robin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。

Consumer

传统的消息传递模式有2种:队列(queue)和(publish-subscribe)

queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。

publish-subscribe模式:消息会被广播给所有的consumer。

Kafka基于这2种模式提供了一种consumer的抽象概念:consumergroup。

queue模式:所有的consumer都位于同一个consumergroup下。

publish-subscribe模式:所有的consumer都有着自己唯一的consumergroup。

上图是由2个broker组成的kafka集群,总共有4个partition(P0-P3)。这个集群由2个ConsumerGroup,A有2个consumerinstances,B有四个。通常一个topic会有几个consumergroup,每个consumergroup都是一个逻辑上的订阅者(logicalsubscriber)。每个consumergroup由多个consumerinstance组成,从而达到可扩展和容灾的功能。

Partition

Partition是一个有序的message序列,这些message按顺序添加到一个叫做commitlog的文件中。每个partition中的消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message。

每个partition,都对应一个commitlog****文件。一个partition中的message的offset都是唯一的,但是不同的partition中的message的offset可能是相同的。

一个topic,代表逻辑上的一个业务数据集,比如按数据库里不同表的数据操作消息区分放入不同topic,订单相关操作消息放入订单topic,用户相关操作消息放入用户topic,对于大型网站来说,后端数据都是海量的,订单消息很可能是非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在topic内部划分多个partition来分片存储数据,不同的partition可以位于不同的机器上,每台机器上都运行一个Kafka的进程Broker。

通常一个topic会有几个consumergroup,每个consumergroup都是一个逻辑上的订阅者(logicalsubscriber)。每个consumergroup由多个consumerinstance组成,从而达到可扩展和容灾的功能。

消费顺序

Kafka比传统的消息系统有着更强的顺序保证。一个partition同一个时刻在一个consumergroup中只有一个consumerinstance在消费,从而保证顺序

consumergroup中的consumerinstance的数量不能比一个Topic中的partition的数量多,否则,多出来的consumer消费不到消息。

Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。

如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumergroup中的consumerinstance数量也设置为1。

从较高的层面上来说的话,Kafka提供了以下的保证:

发送到一个Topic中的message会按照发送的顺序添加到commitlog中。意思是,如果消息M1,M2由同一个producer发送,M1比M2发送的早的话,那么在commitlog中,M1的offset就会比commit2的offset小。一个consumer在commitlog中可以按照发送顺序来消费message。如果一个topic的备份因子设置为N,那么Kafka可以容忍N-1个服务器的失败,而存储在commitlog中的消息不会丢失。

Springboot整合Kafka

配置文件:

server:
  port: 8080
spring:
  kafka:
    bootstrap-servers: 192.168.16.140:9092,192.168.16.140:9093,192.168.16.140:9094
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: testGroup
      enable-auto-commit: true
logging:
  level:
    root: info
@Component
public class MyConsumer {

    /**
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     *             @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     *             @TopicPartition(topic = "topic2", partitions = "0",
     *                     partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     *     },concurrency = "6")
     *  //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
     * @param record
     */
    @KafkaListener(topics = "mytopic", groupId = "testGroup")
    public void listen(ConsumerRecord<String, String> record) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
    }
}
@RestController
public class KafkaController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public void send() {
        kafkaTemplate.send("mytopic", 0, "key", "this is a msg");
    }

}
标签: 笔记

本文转载自: https://blog.csdn.net/IrinaCao/article/details/136201013
版权归原作者 keroroP 所有, 如有侵权,请联系我们删除。

“kafka学习笔记”的评论:

还没有评论