一.版本兼容的问题
因为某个功能需要对接的kafka是一个上古版本0.10.0.0,公司项目又是springcloud项目,导致版本兼容性的问题很头大
1.kafka的版本号
下载的windows版kafka如:kafka_2.10-0.10.0.0
2.10标识编译kafka集群的scala版本号,kafka的服务端编码语言为scala
0.10.0.0标识kafka真正的版本号
kafka的版本号从1.0开始由四位版本号改为了三位,既类似0.9.0.0–>1.0.0。
2.java对接kafka一般有以下的方式
- spring-cloud-stream/spring-cloud-stream-binder-kafka各个版本的官方文档:spring-could-stream**scs中也引入了 spring kafka,kafka client也有对应关系在官网中可以看到
- kafka-clients
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.0.0</version></dependency>
第二这种会引入两个依赖jar,不使用 scala api可以用第一种
kafka-clients-0.10.2.0.jar
kafka_2.11-0.10.2.0.jar
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.2.0</version></dependency>
- spring-kafka官方文档:spring-kafkaspring kafka的版本和spring-boot-starter-parent要匹配spring-kafka中引入了kafka-client的版本对照关系如下 此处有个坑就是他强制要求springboot的版本和spring-kafka对应
//https://blog.csdn.net/lzx1991610/article/details/100777040<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
二.实现订阅和发布消息代码
Kafka消费者通过groupId消费指定topic的,
以groupId区分不同的消费者,即不同的groupId消费相同的topic,对于topic而言,就是不同的消费者,
同时,消费者需要记录消费到的offset,以便下次启动时定位到具体的位置,消费消息。
这里,配置的offset策略为:latest,即每次重启消费者时,从最新的offset开始消费(上次记录的offset之后的一个,如果上次消费没有记录,则从当前offset之后开始消费)。
offset的重置这样理解: 当前topic写入数据有4条,offset从0到3,
如果,offset重设为earliest,则每次重启消费者,offset都会从0开始消费数据;
如果,offset重设为latest,则,每次消费从上次消费的offset下一个开始消费,如果上次消费的offset为3,则,重启后,
从4开始消费数据。 原文链接:https://blog.csdn.net/Xin_101/article/details/126154171
参考博客: https://www.jianshu.com/p/1f9e18e926f6
publicclassKafkaUtil{finalstatic String url ="localhost:9092";publicstaticvoidreceiveBPMessage(){
Properties props =newProperties();//183.240.87.230:9092为消息服务器开放的TCP端口
props.put("bootstrap.servers", KafkaUtil.url);//0为消费者所在的用户组,同一个组对于消息的消费只能有一次,不同组可以共同消费同一条消息
props.put("group.id","0");//指定了消费者是否自动提交偏移量,默认值是 true,自动提交
props.put("enable.auto.commit","false");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");//server.keystore.jks证书所在路径,以及密码。由消息服务器颁发。// props.put("ssl.keystore.location","/root/securityCA/server.keystore.jks");// props.put("ssl.keystore.password", "123456");// props.put("security.protocol","SSL");// props.put("ssl.truststore.type", "JKS");// props.put("ssl.keystore.type", "JKS");//client.truststore.jks证书所在路径,以及密码。由消息服务器颁发。// props.put("ssl.truststore.location","/root/securityCA/client.truststore.jks");// props.put("ssl.truststore.password", "123456");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//建立consumer连接
KafkaConsumer<String, String> consumer =newKafkaConsumer<String, String>(props);//订阅主题
consumer.subscribe(Collections.singletonList("test"));//消息轮询是消费者的核心,通过轮询向服务器请求数据try{while(true){//消费消息
ConsumerRecords<String, String> records = consumer.poll(500);// for (ConsumerRecord<String, String> record : records) {// // 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。// System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",// record.topic(), record.partition(), record.offset(),record.key(), record.value()));// }for(TopicPartition partition : records.partitions()){
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for(ConsumerRecord<String, String> record : partitionRecords){//对消息做简单地打印操作
System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
record.topic(), record.partition(), record.offset(),record.key(), record.value()));}long lastOffset=partitionRecords.get(partitionRecords.size()-1).offset();//提交消息消费的offset
consumer.commitSync(Collections.singletonMap(partition,newOffsetAndMetadata(lastOffset +1)));}}} finally {// 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡
consumer.close();}}publicstaticvoidsendBPMessage(JSONObject object){
Properties producerProps =newProperties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaUtil.url);//server.keystore.jks证书所在路径。由消息服务器颁发。// producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/root/securityCA/server.keystore.jks");// //server.keystore.jks证书的密码。由消息服务器提供。// producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"medstarMessageServer");// //client.truststore.jks证书所在路径。由消息服务器颁发。// producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/root/securityCA/client.truststore.jks");// //client.truststore.jks证书的密码。由消息服务器提供。// producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"medstarMessageServer");// producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");// producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//根据配置文件创建生产者连接
KafkaProducer producer =newKafkaProducer(producerProps);//发送消息,该实例中,为循环发送test数据100次,可以根据实际情况,遍历列表中的数据,拼接成规定的消息格式进行发送,一般,同一个机构的消息发送通道是固定的,通道会由消息服务器产生并分配给对应机构for(int i =0; i <10; i++){//新建ProducerRecord类型的数据,第一个参数为发送的通道,第二个参数为发送消息的内容
ProducerRecord<String,String> r =newProducerRecord<String,String>("test","key-"+i,"中文-"+i);
producer.send(r);
System.err.println("发送消息");}//关闭消息服务器连接,可以在消息全部发送完毕的时候关闭连接
producer.close();}}
三.安装windows版kafka进行测试
参考博客: https://blog.csdn.net/marquis0/article/details/126525221
命令参考
//启动内置zk.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
//启动kafka服务.\bin\windows\kafka-server-start.bat .\config\server.properties
//创建一个名称为test的topic 类似于数据库的表 .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181-replication-factor 1--partitions 1--topic test
//创建一个生产者.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092--topic test
//创建一个消费者.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092--topic test --from-beginning --zookeeper localhost:2181
不同版本的kafka命令会不一样 以下参考
旧版本
##创建topic
./kafka-topics.sh --create --bootstrap-server localhost:9092--replication-factor 1--partitions 1--topic xxoo
#查看topic
./kafka-topics.sh --list --bootstrap-server localhost:9092#topic描述./kafka-topics.sh --describe --zookeeper localhost:2181--topic xxoo
#producer(控制台向topic生产数据)./kafka-console-producer.sh --broker-list localhost:9092--topic xxoo
>this is a message
>this is another message
##consumer(控制台消费topic的数据2)./kafka-console-consumer.sh --bootstrap-server localhost:9092--topic xxoo --from-beginning
this is a message
this is another message
## 查看某一个topic对应的消息数量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092--topic xxoo --time -1
## 新版本的消费者组名和它要消费的那个topic的offset信息就会被记录在broker服务器上,老版本存在zookeeper上
./kafka-consumer-groups.sh --bootstrap-server localhost:9092--list
./kafka-consumer-groups.sh --bootstrap-server kafka01.qq.cn:9092,kafka02.qq.cn:9092,kafka03.qq.cn:9092--list
##删除消费组
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092--delete--group py-test
##查看消费组的的列表
./kafka-consumer-groups.sh --list --bootstrap-server 192.168.100.11:9092
或者
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 xxoo --list
## 查看特定消费组的情况
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092--group py-test --describe
-- 旧版本Kafka命令行参数(kafka_scala2.11-2.0.0 为例)
# 查看topic
./kafka-topics.sh --list --zookeeper localhost:2181
## topic描述
./kafka-topics.sh --describe --zookeeper localhost:2181--topic xxoo
## 创建topic
./kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic xxoo
#topic查看信息
# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181--describe --topic xx
# 分区扩展
# /usr/local/kafka/bin/kafka-topics.sh --alter --topic xx --zookeeper localhost:2181--partitions 24
## consumer(控制台消费topic的数据2)./kafka-console-consumer.sh --bootstrap-server localhost:9092--topic xxoo --from-beginning
# 指定消费组消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092--topic xxoo --group xx-group
### 生产数据
./kafka-console-producer.sh --broker-list localhost:9092--topic xxoo
## 查看某一个topic对应的消息数
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092--topic xxoo --time -1
## delete topic
./kafka-topics --delete--zookeeper localhost:2181--topic javadaemon
# 查看消费组列表
./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092./kafka-consumer-groups.sh --list --bootstrap-server kafka01.car.cn:9092
# 查看指定消费组以及连接的ip地址
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.2:9092--describe --group vmsOperationLogGroup|grep vms-road_fee
## 查看指定消费组的堆积情况
./kafka-consumer-groups.sh --bootstrap-server kafka01.car.cn:9092--describe --group knight_group
## 查看指定分区的信息
# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper IP:2181--topic test
清理openapi-AccessLog-Rest指定保留2天
# /usr/local/kafka/bin/kafka-configs.sh --zookeeper IP:2181--entity-type topics --entity-name test --alter --add-config retention.ms=172800000
测试在生产者命令窗口发布消息,发现消费者命令窗口打印显示,并且项目main方法调用执行消费者后,也会收到消息
测试使用java接口发布消息,kafka客户端也能接受到消息
版权归原作者 且听狂澜度三秋呀呀呀呀 所有, 如有侵权,请联系我们删除。