0


Kafka快速入门

讲一下什么是Kafka

首先引入这样一个场景:A服务可以发送200qps(Queries Per Second,是指每秒查询率),而B服务可以处理100qps。很显然,B服务很可能会被A服务压垮掉。怎么为了保证B不被压垮的同时还能处理A消息,没有什么是不能通过一层中间件解决的,如果有,那就再加一层。

开始很容易想到,可以在B服务中增加一个队列,其实就是个链表,B服务根据自己的消费能力,消费链表中的消息。每个节点用Offset记录,消费过程不断更新自己的Offset值。但就会出现来不及处理的消息都堆积到内存中,如果B重启,就会丢失消息。

所以可以把队列挪出来,变成一个单独的进程。也就成了简陋的消息队列。像A这样生产消息的为生产者,B消费消息的为消费者。但这太简陋了,高性能高可用高扩展,他都不沾边。

高性能

对于高性能,由于B消费能力有限,消息队列会堆积消息,就可以通过扩张多个消费者来加快消费,同理可以扩展多个生产者加快生产消息。但他们都同时竞争同一个消息队列,就会导致其他人等待浪费时间。

就可以分为多个消息队列,引出topic概念,将消息进行分类。生产者和消费者都去对应其中的topic。

但消息队列可能还是有点长,就可以分段,进行分区partition的概念诞生,每个消费者负责一个partition。

高扩展

对于高扩展,由于分区变多,都放在同一台机器,就会导致内存和cpu过高,影响整体系统性能,就可以引入多台机器,每个机器代表一个Broker。将分区分散部署到各个Broker中,就可以缓解单台服务器压力。

高可用

对于高可用,由于分区可能挂了,就会导致数据丢失,所以可以进行数据备份,主从复制,Leader负责和消费者生产者读写请求,follower负责同步Leader数据。这样就算Leader挂了,也可以顶替上去。

持久化和过期策略

也有可能所有Broker都挂了,就需要将数据放入内存,持久化到磁盘中。即使全部Broker都挂了,数据也不会丢失。但磁盘内存有限,就需要加个保留策略,retention policy(比如磁盘超过一定数据,或消息放置超过一定时间,就会被清理掉)

consumer group

到这里,这个消息队列好像就挺完美了。但其实还有个问题,按现在的消费方式,每次新增的消费者只能跟着最新的消费 Offset 接着消费。 可以给消息队列加入消费者组(consumer group)的概念,B 和 C 服务各自是一个独立的消费者组,不同消费者组维护自己的消费进度也就是Offset值,互不打搅。

基础架构

Producer(生产者):产生消息的一方

Consumer(消费者):消费消息的一方

Consumer Group(消费者组):一组存在多个消费者,消费者组间互不影响,所有消费者都可能属于某个消费者组,算一个逻辑订阅者。分区中的消息只能被一个消费者消费

Broker(代理,经纪人):一台Kafka服务器就算一个Broker,多个Broker组成Kafka Cluster(集群)

Topic(主题):生产者消费者都面向一个主题

Partition(分区):为了实现扩展性,一个Topic可能分布到多个Broker,一个Topic可以分为多个Partition,每个Partition相当于有序队列。

  • Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常的工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower
  • Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
  • Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。

也可以看看FGuide哥的

代码实现

1、引入kafka的依赖

         <dependency>
             <groupId>org.springframework.cloud</groupId>
             <artifactId>spring-cloud-starter-stream-kafka</artifactId>
         </dependency>

2、配置kafka

 spring:
   kafka:
     bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,这里有三个地址,用逗号分隔。
     listener:
       ack-mode: manual_immediate #设置消费者的确认模式为manual_immediate,表示消费者在接收到消息后立即手动确认。
       concurrency: 3  #设置消费者的并发数为5
       missing-topics-fatal: false  #设置为false,表示如果消费者订阅的主题不存在,不会抛出异常。
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 设置消息键的序列化器
       value-serializer: org.apache.kafka.common.serialization.StringSerializer #设置消息值的序列化器
       acks: 1  #一般就是选择1,兼顾可靠性和吞吐量 ,如果想要更高的吞吐量设置为0,如果要求更高的可靠性就设置为-1
     consumer:
       auto-offset-reset: earliest #设置为"earliest"表示将从最早的可用消息开始消费,即从分区的起始位置开始读取消息。
       enable-auto-commit: false #禁用了自动提交偏移量的功能,为了避免出现重复数据和数据丢失,一般都是手动提交
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置消息键的反序列化器
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置消息值的反序列化器

3、创建主题

  • 自动创建(不推荐) 在kafka的安装目录conf目录下找到该配置文件server.properties,添加如下配置: num.partitions=3 #默认3个分区 auto.create.topics.enable=true #开启自动创建主题 default.replication.factor=3 #默认3个副本
  • 手动创建
 在kafka的安装目录bin目录下,执行如下命令:
 //创建一个有三个分区和三个副本,名为zhuoye的主题
 ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye

4、生产者代码

 @Slf4j
 @Component
 public class ALiYunServiceImpl implents IALiYunService {
     @Autowired
     private KafkaTemplate kafkaTemplate;
     @Autowired
     private ExecutorService executorService;
     String topicName = "zhuoye";
     @Override
     public void queryECSMetricInfo() {
         //发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的
         List<Message> messages = Collections.synchronizedList(new ArrayList<>());
         boolean flag = true;
         //获取上次查询时间
         Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;
         Long endTime = System.currentTimeMillis();
         try {
             //查询出所有的运行中的实例
             List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");
             if (CollectionUtils.isEmpty(cloudInstances)) {
                 return;
             }
             //定义计数器
             CountDownLatch latch = new CountDownLatch(cloudInstances.size());
             //遍历查询
             for (CloudInstanceAssetDto instance : cloudInstances) {
                 executorService.submit(() -> {
                     try {
                         //获取内网流出带宽,并将结果封装到消息集合中
                         dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,
                                 startTime, endTime, instance, messages);
                     } catch (Exception e) {
                         log.error("获取ECS的指标数据-多线程处理任务异常!", e);
                     } finally {
                         latch.countDown();
                     }
 ​
                 });
             }
             //等待任务执行完毕
             latch.await();
             //将最终的消息集合发送到kafka
             if (CollectionUtils.isNotEmpty(messages)) {
                 for (int i = 0; i < messages.size(); i++) {
                     if (StringUtils.isNotBlank(messages.get(i).getValue())
                             && "noSuchInstance".equals(messages.get(i).getValue())) {
                         continue;
                     }
                   kafkaTemplate.send(topicName,  messages.get(i));
                 }
             }
         } catch (Exception e) {
             flag = false;
             log.error("获取ECS的指标数据失败", e);
         }
         //更新记录上次查询时间
         if (flag) {
             QueryTimeRecord queryTimeRecord = new QueryTimeRecord();
             queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟
             queryTimeRecordMapper.updateByBelongId(queryTimeRecord);
         }
     }

这个时候,如果你想看有没有把消息发送到kafka的指定主题可以使用如下命令:

 kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye

5、消费者代码

 @Slf4j
 @Component
 public class KafkaConsumer {
     // 消费监听
     @KafkaListener(topics = "zhuoye",groupId ="zhuoye-aliyunmetric")
     public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){
         try {
             String value = record.value();
             //处理数据,存入openTsDb
             .................
             ................
             ack.acknowledge();//手动提交
         }catch (Exception e){
             log.error("kafa-topic【zhuoye】消费阿里云指标源消息【失败】");
             log.error(e.getMessage());
         }
 ​
     }
 }

6、常用Kafka的命令

 //创建主题
 ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye
 //查看kafka是否接收对应的消息
  kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye
 // 修改kafka-topic分区数
 ./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic zhuoye
 // 查看topic分区数
 ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic zhuoye
 // 查看用户组消费情况
 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe

借鉴B站:Mic,小白debug,拙野

标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/qq_62097431/article/details/141947734
版权归原作者 G丶AEOM 所有, 如有侵权,请联系我们删除。

“Kafka快速入门”的评论:

还没有评论