0


KafKa学习笔记

KafKa学习笔记

一、安装和使用

1.安装和使用kafka

  1. Docker安装启动Kafka1、下载KafkaZookeeper镜像文件
  2. docker pull wurstmeister/kafka docker pull wurstmeister/zookeeper

2、先运行zk

  1. docker run -dit --name zookeeper -p 2181:2181 -dit wurstmeister/zookeeper

3、再运行kafka,记得改为你的zookeeper ip

  1. docker run -dit --name kafka --publish 9092:9092 --link zookeeper:zookeeper -e KAFKA_BROKER_ID=1 -e HOST_IP=8.134.148.68 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://8.134.148.68:9092 -e KAFKA_ADVERTISED_HOST_NAME=8.134.148.68 -e KAFKA_ADVERTISED_PORT=9082 --restart=always -dit wurstmeister/kafka
  2. docker run -dit --name kafka --publish 9092:9092 --link zookeeper:zookeeper -e KAFKA_BROKER_ID=1 -e HOST_IP=127.0.0.1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -e KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 -e KAFKA_ADVERTISED_PORT=9082 --restart=always -dit wurstmeister/kafka

4、查看容器是否正常运行

  1. docker ps

5、进行生产消费测试

  1. docker exec -it kafka bash cd /opt/kafka/bin/

6、运行kafka生产者发送消息

  1. ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

7、运行kafka消费者接收消息

  1. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

二.kafka的一些概念

1、生产者消费者

Broker:消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群。Topic:Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic。Producer:消息生产者,向Broker发送消息的客户端。Consumer:消息消费者,从Broker读取消息的客户端。ConsumerGroup:每个Consumer属于一个特定的ConsumerGroup,一条消息可以被多个不同的Consumer Group消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息。Partition:物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的。

在这里插入图片描述

2.主题和分区的使用、

主题

主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被

订阅该topic的消费者消费。

通过kafka命令向zk中创建⼀个主题

  1. kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test3

查看主题

  1. ./kafka-topics.sh --list --bootstrap-server localhost:9092 test

分区****Partition

1*)分区的概念

通过partition将⼀个topic中的消息分区来存储。这样的好处有多个:

分区存储,可以解决统⼀存储⽂件过⼤的问题

提供了读写的吞吐量:读和写可以同时在多个分区中进⾏

创建多分区的主题

  1. kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test3

4.消息的单播和多播

单播消息在一个kafka的topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否同时会被两个消费者消费?

如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息。换言之,同一个消费组中只能有一个消费者收到一个topic中的消息。

  1. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test

多播不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息。

  1. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test

在这里插入图片描述

3.Kafka集群

先省略

三、Java客户端消费者的实现kafka

同步生产者

  1. package org.example;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. /**
  7. * @Author yuhj
  8. * @Date 2022 11 01 16 55
  9. **/
  10. public class MySimpleProducer {
  11. private final static String TOPIC_NAME = "my-replicated-topic";
  12. public static void main(String[] args) throws ExecutionException,
  13. InterruptedException {
  14. //1.设置参数
  15. Properties props = new Properties();
  16. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  17. "127.0.0.1:9092");
  18. //把发送的key从字符串序列化为字节数组
  19. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  20. StringSerializer.class.getName());
  21. //把发送消息value从字符串序列化为字节数组
  22. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  23. StringSerializer.class.getName());
  24. //2.创建⽣产消息的客户端,传⼊参数
  25. Producer<String, String> producer = new KafkaProducer<String,
  26. String>(props);
  27. //3.创建消息
  28. //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
  29. ProducerRecord<String, String> producerRecord = new ProducerRecord<>
  30. (TOPIC_NAME, "mykeyvalue", "hellokafka");
  31. //4.发送消息,得到消息发送的元数据并输出
  32. RecordMetadata metadata = producer.send(producerRecord).get();
  33. System.out.println("同步⽅式发送消息结果:" + "topic-" +
  34. metadata.topic() + "|partition-"
  35. + metadata.partition() + "|offset-" + metadata.offset());
  36. }
  37. }

消费者

  1. package org.example;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.time.Duration;
  8. import java.util.Arrays;
  9. import java.util.Properties;
  10. /**
  11. * @Author yuhj
  12. * @Date 2022 11 01 17 15
  13. **/
  14. public class MySimpleConsumer {
  15. private final static String TOPIC_NAME = "my-replicated-topic";
  16. private final static String CONSUMER_GROUP_NAME = "testGroup";
  17. public static void main(String[] args) {
  18. Properties props = new Properties();
  19. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  20. "127.0.0.1:9092");
  21. // 消费分组名
  22. props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
  23. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  24. StringDeserializer.class.getName());
  25. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  26. StringDeserializer.class.getName());
  27. //1.创建⼀个消费者的客户端
  28. KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
  29. String>(props);
  30. //2. 消费者订阅主题列表
  31. consumer.subscribe(Arrays.asList(TOPIC_NAME));
  32. while (true) {
  33. /*
  34. * 3.poll() API 是拉取消息的⻓轮询
  35. */
  36. ConsumerRecords<String, String> records =
  37. consumer.poll(Duration.ofMillis(1000));
  38. for (ConsumerRecord<String, String> record : records) {
  39. //4.打印消息
  40. System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
  41. record.offset(), record.key(), record.value());
  42. }
  43. }
  44. }
  45. }

2.关于消费者⾃动提交和⼿动提交

1)提交的内容**

消费者⽆论是⾃动提交还是⼿动提交,都需要把所属的消费组+消费的某个主题+消费的某个

分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题⾥⾯。

2⾃动提交**

消费者poll消息下来以后就会⾃动提交offset

  1. // 是否⾃动提交offset,默认就是true
  2. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  3. // ⾃动提交offset的间隔时间
  4. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

注意:⾃动提交会丢消息。因为消费者在消费前提交offset,有可能提交完后还没消费时消费

者挂了。

3)⼿动提交

需要把⾃动提交的配置改成false

  1. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

⼿动提交⼜分成了两种:⼿动同步提交在消费完消息后调⽤同步提交的⽅法,当集群返回ack前⼀直阻塞,返回ack后表示提交成功,执⾏之后的逻辑

  1. while (true) {
  2. /*
  3. * poll() API 是拉取消息的⻓轮询
  4. */
  5. ConsumerRecords<String, String> records =
  6. consumer.poll(Duration.ofMillis(1000));
  7. for (ConsumerRecord<String, String> record : records) {
  8. System.out.printf("收到消息:partition = %d,offset = %d, key
  9. = %s, value = %s%n", record.partition(),
  10. record.offset(), record.key(), record.value());
  11. }
  12. //所有的消息已消费完
  13. if (records.count() > 0) {//有消息
  14. // ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
  15. // ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
  16. consumer.commitSync();//=======阻塞=== 提交成功
  17. }
  18. }
  19. }

⼿动异步提交

在消息消费完后提交,不需要等到集群ack,直接执⾏之后的逻辑,可以设置⼀个回调⽅

法,供集群调⽤

  1. while (true) {
  2. /*
  3. * poll() API 是拉取消息的⻓轮询
  4. */
  5. ConsumerRecords<String, String> records =
  6. consumer.poll(Duration.ofMillis(1000));
  7. for (ConsumerRecord<String, String> record : records) {
  8. System.out.printf("收到消息:partition = %d,offset = %d, key
  9. = %s, value = %s%n", record.partition(),
  10. record.offset(), record.key(), record.value());
  11. }
  12. //所有的消息已消费完
  13. if (records.count() > 0) {
  14. // ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯
  15. 的程序逻辑
  16. consumer.commitAsync(new OffsetCommitCallback() {
  17. @Override
  18. public void onComplete(Map<TopicPartition,
  19. OffsetAndMetadata> offsets, Exception exception) {
  20. if (exception != null) {
  21. System.err.println("Commit failed for " + offsets);
  22. System.err.println("Commit failed exception: " +
  23. exception.getStackTrace());
  24. }
  25. }
  26. });
  27. }
  28. }
  29. }

⻓轮询poll消息

默认情况下,消费者⼀次会poll500条消息。

  1. //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
  2. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

代码中设置了⻓轮询的时间是1000毫秒

  1. while (true) {
  2. /*
  3. * poll() API 是拉取消息的⻓轮询
  4. */
  5. ConsumerRecords<String, String> records =
  6. consumer.poll(Duration.ofMillis(1000));
  7. for (ConsumerRecord<String, String> record : records) {
  8. System.out.printf("收到消息:partition = %d,offset = %d, key = %s,
  9. value = %s%n", record.partition(),
  10. record.offset(), record.key(), record.value());
  11. }

意味着:

如果⼀次poll到500条,就直接执⾏for循环

如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500

条,要么到1s

如果多次poll都没达到500条,且1秒时间到了,那么直接执⾏for循环

如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消

费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,

让⼀次poll的消息条数少⼀点

  1. //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
  2. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
  3. //如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢
  4. 出消费组。将分区分配给其他消费者。-rebalance
  5. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

消费者的健康状态检查

  1. //consumer给broker发送⼼跳的间隔时间
  2. props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
  3. //kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏
  4. rebalance,把分区分配给其他消费者。
  5. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

指定分区和偏移量、时间消费

指定分区消费

  1. consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

从头消费

  1. consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

指定offset消费

  1. consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);

指定时间消费

根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该

offset之后的消息开始消费。

  1. List<PartitionInfo> topicPartitions =
  2. consumer.partitionsFor(TOPIC_NAME);
  3. //从1⼩时前开始消费
  4. long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
  5. Map<TopicPartition, Long> map = new HashMap<>();
  6. for (PartitionInfo par : topicPartitions) {
  7. map.put(new TopicPartition(TOPIC_NAME, par.partition()),
  8. fetchDataTime);
  9. }
  10. Map<TopicPartition, OffsetAndTimestamp> parMap =
  11. consumer.offsetsForTimes(map);
  12. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
  13. parMap.entrySet()) {
  14. TopicPartition key = entry.getKey();
  15. OffsetAndTimestamp value = entry.getValue();
  16. if (key == null || value == null) continue;
  17. Long offset = value.offset();
  18. System.out.println("partition-" + key.partition() +
  19. "|offset-" + offset);
  20. System.out.println();
  21. //根据消费⾥的timestamp确定offset
  22. if (value != null) {
  23. consumer.assign(Arrays.asList(key));
  24. consumer.seek(key, offset);
  25. }
  26. }

四.Springboot中使⽤Kafka

kafka教程新

在Spring Boot中集成Apache Kafka,你需要完成以下几个步骤。这里我假设你正在使用Maven作为构建工具。以下是集成的基本步骤:

添加依赖: 在你的pom.xml文件中,添加Spring Kafka的依赖。确保选择与你的Spring Boot版本兼容的Spring Kafka版本。例如,如果你使用的是Spring Boot 2.3.x,一个合适的Spring Kafka版本可能是2.5.x或2.6.x:

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

配置Kafka: 在application.yml或application.properties中添加Kafka的连接配置。例如:

  1. spring:
  2. kafka:
  3. bootstrap-servers: localhost:9092
  4. consumer:
  5. group-id: my-consumer-group
  6. auto-offset-reset: earliest
  7. producer:
  8. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  9. value-serializer: org.apache.kafka.common.serialization.StringSerializer

请根据实际情况替换bootstrap-servers为你自己的Kafka集群地址。

创建生产者: 创建一个@Component类,使用KafkaTemplate来发送消息到Kafka主题:

  1. import org.springframework.kafka.core.KafkaTemplate;
  2. import org.springframework.stereotype.Component;
  3. @Component
  4. public class KafkaProducer {
  5. private final KafkaTemplate<String, String> kafkaTemplate;
  6. public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
  7. this.kafkaTemplate = kafkaTemplate;
  8. }
  9. public void sendMessage(String topic, String message) {
  10. kafkaTemplate.send(topic, message);
  11. }
  12. }

创建消费者: 创建一个@KafkaListener注解的类来监听Kafka主题:

  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Component;
  3. @Component
  4. public class KafkaConsumer {
  5. @KafkaListener(topics = "my-topic")
  6. public void listen(String message) {
  7. System.out.println("Received message: " + message);
  8. }
  9. }

测试: 可以编写单元测试来验证生产者和消费者是否正常工作。

  1. kafkaTemplate.send(topic, message);
  2. }

}

  1. 创建消费者: 创建一个@KafkaListener注解的类来监听Kafka主题:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

  1. @KafkaListener(topics = "my-topic")
  2. public void listen(String message) {
  3. System.out.println("Received message: " + message);
  4. }

}

  1. 测试: 可以编写单元测试来验证生产者和消费者是否正常工作。
  2. 运行应用: 启动你的Spring Boot应用,观察控制台输出以确认消息是否成功发送和接收。
标签: kafka 学习 笔记

本文转载自: https://blog.csdn.net/weixin_46242847/article/details/140141314
版权归原作者 weixin_46242847 所有, 如有侵权,请联系我们删除。

“KafKa学习笔记”的评论:

还没有评论