KafKa学习笔记
一、安装和使用
1.安装和使用kafka
Docker安装启动Kafka1、下载Kafka和Zookeeper镜像文件
docker pull wurstmeister/kafka docker pull wurstmeister/zookeeper
2、先运行zk
docker run -dit --name zookeeper -p 2181:2181 -dit wurstmeister/zookeeper
3、再运行kafka,记得改为你的zookeeper ip
docker run -dit --name kafka --publish 9092:9092 --link zookeeper:zookeeper -e KAFKA_BROKER_ID=1 -e HOST_IP=8.134.148.68 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://8.134.148.68:9092 -e KAFKA_ADVERTISED_HOST_NAME=8.134.148.68 -e KAFKA_ADVERTISED_PORT=9082 --restart=always -dit wurstmeister/kafka
docker run -dit --name kafka --publish 9092:9092 --link zookeeper:zookeeper -e KAFKA_BROKER_ID=1 -e HOST_IP=127.0.0.1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -e KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 -e KAFKA_ADVERTISED_PORT=9082 --restart=always -dit wurstmeister/kafka
4、查看容器是否正常运行
docker ps
5、进行生产消费测试
docker exec -it kafka bash cd /opt/kafka/bin/
6、运行kafka生产者发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
7、运行kafka消费者接收消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
二.kafka的一些概念
1、生产者消费者
Broker:消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群。Topic:Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic。Producer:消息生产者,向Broker发送消息的客户端。Consumer:消息消费者,从Broker读取消息的客户端。ConsumerGroup:每个Consumer属于一个特定的ConsumerGroup,一条消息可以被多个不同的Consumer Group消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息。Partition:物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的。
2.主题和分区的使用、
主题
主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被
订阅该topic的消费者消费。
通过kafka命令向zk中创建⼀个主题
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test3
查看主题
./kafka-topics.sh --list --bootstrap-server localhost:9092 test
分区****Partition
1*)分区的概念
通过partition将⼀个topic中的消息分区来存储。这样的好处有多个:
分区存储,可以解决统⼀存储⽂件过⼤的问题
提供了读写的吞吐量:读和写可以同时在多个分区中进⾏
创建多分区的主题
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test3
4.消息的单播和多播
单播消息在一个kafka的topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否同时会被两个消费者消费?
如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息。换言之,同一个消费组中只能有一个消费者收到一个topic中的消息。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
多播不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test
3.Kafka集群
先省略
三、Java客户端消费者的实现kafka
同步生产者
package org.example;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @Author yuhj
* @Date 2022 11 01 16 55
**/
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException,
InterruptedException {
//1.设置参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//2.创建⽣产消息的客户端,传⼊参数
Producer<String, String> producer = new KafkaProducer<String,
String>(props);
//3.创建消息
//key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
ProducerRecord<String, String> producerRecord = new ProducerRecord<>
(TOPIC_NAME, "mykeyvalue", "hellokafka");
//4.发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步⽅式发送消息结果:" + "topic-" +
metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
消费者
package org.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @Author yuhj
* @Date 2022 11 01 17 15
**/
public class MySimpleConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
//1.创建⼀个消费者的客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
//2. 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* 3.poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//4.打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
2.关于消费者⾃动提交和⼿动提交
1)提交的内容**
消费者⽆论是⾃动提交还是⼿动提交,都需要把所属的消费组+消费的某个主题+消费的某个
分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题⾥⾯。
2⾃动提交**
消费者poll消息下来以后就会⾃动提交offset
// 是否⾃动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// ⾃动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
注意:⾃动提交会丢消息。因为消费者在消费前提交offset,有可能提交完后还没消费时消费
者挂了。
3)⼿动提交
需要把⾃动提交的配置改成false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
⼿动提交⼜分成了两种:⼿动同步提交在消费完消息后调⽤同步提交的⽅法,当集群返回ack前⼀直阻塞,返回ack后表示提交成功,执⾏之后的逻辑
while (true) {
/*
* poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key
= %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
//所有的消息已消费完
if (records.count() > 0) {//有消息
// ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
// ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
consumer.commitSync();//=======阻塞=== 提交成功
}
}
}
⼿动异步提交
在消息消费完后提交,不需要等到集群ack,直接执⾏之后的逻辑,可以设置⼀个回调⽅
法,供集群调⽤
while (true) {
/*
* poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key
= %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
//所有的消息已消费完
if (records.count() > 0) {
// ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯
的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " +
exception.getStackTrace());
}
}
});
}
}
}
⻓轮询poll消息
默认情况下,消费者⼀次会poll500条消息。
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
代码中设置了⻓轮询的时间是1000毫秒
while (true) {
/*
* poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s,
value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
意味着:
如果⼀次poll到500条,就直接执⾏for循环
如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500
条,要么到1s
如果多次poll都没达到500条,且1秒时间到了,那么直接执⾏for循环
如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消
费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,
让⼀次poll的消息条数少⼀点
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢
出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
消费者的健康状态检查
//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏
rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
指定分区和偏移量、时间消费
指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
从头消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
指定时间消费
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该
offset之后的消息开始消费。
List<PartitionInfo> topicPartitions =
consumer.partitionsFor(TOPIC_NAME);
//从1⼩时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()),
fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap =
consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() +
"|offset-" + offset);
System.out.println();
//根据消费⾥的timestamp确定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
四.Springboot中使⽤Kafka
kafka教程新
在Spring Boot中集成Apache Kafka,你需要完成以下几个步骤。这里我假设你正在使用Maven作为构建工具。以下是集成的基本步骤:
添加依赖: 在你的pom.xml文件中,添加Spring Kafka的依赖。确保选择与你的Spring Boot版本兼容的Spring Kafka版本。例如,如果你使用的是Spring Boot 2.3.x,一个合适的Spring Kafka版本可能是2.5.x或2.6.x:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置Kafka: 在application.yml或application.properties中添加Kafka的连接配置。例如:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-consumer-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
请根据实际情况替换bootstrap-servers为你自己的Kafka集群地址。
创建生产者: 创建一个@Component类,使用KafkaTemplate来发送消息到Kafka主题:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
创建消费者: 创建一个@KafkaListener注解的类来监听Kafka主题:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "my-topic")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
测试: 可以编写单元测试来验证生产者和消费者是否正常工作。
kafkaTemplate.send(topic, message);
}
}
创建消费者: 创建一个@KafkaListener注解的类来监听Kafka主题:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "my-topic")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
测试: 可以编写单元测试来验证生产者和消费者是否正常工作。
运行应用: 启动你的Spring Boot应用,观察控制台输出以确认消息是否成功发送和接收。
版权归原作者 weixin_46242847 所有, 如有侵权,请联系我们删除。