Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接
前言
上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。
由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。
官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。
一、如何看官方文档(有深入了解需求的人)
1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程
2.配置建议看中文版本
二、kafka的安装
tar包安装
- 下载链接:kafka_2.13-3.8.0.tgz
- 选择一个合适的位置解压
tar -zxvf kafka_2.12-3.8.0.tgz
- 启动自带的zookeeper(后台启动)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
- 修改kafka server的配置文件,便于外网能够访问 找到bin\config目录下的server.properties文件 修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092
listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://172.16.72.133:9092
- 启动kafka server(后台启动)
nohup bin/kafka-server-start.sh config/server.properties &
- 稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理: 首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):
cp config/server.properties config/server-1.propertiescp config/server.properties config/server-2.properties
编辑这些新文件并设置如下属性:config/server-1.properties: broker.id=1listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录 然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来bin/kafka-server-start.sh config/server-1.properties &bin/kafka-server-start.sh config/server-2.properties &
但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。
docker安装
- 拉取镜像
docker pull apache/kafka:3.8.0
- 启动
docker run -p -d 9092:9092 apache/kafka:3.8.0
三、代码中集成
创建一个测试topic:test
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。
producer代码
@RestController@RequestMapping("/mqtest")publicclassKafkaTestController{privatestaticfinalLogger logger =LoggerFactory.getLogger(KafkaTestController.class);@AutowiredprivateStreamBridge streamBridge;@RequestMapping("/test1")publicvoidtestOne(){Message<SimpleMsg> msg =newGenericMessage<>(newSimpleMsg("我是 broadcastMessage",newDate().toString()));
streamBridge.send("broadcastMessage-out-0", msg);}}
自定义消息体SimpleMsg,此类不需要序列化
@AllArgsConstructor@Data@NoArgsConstructorpublicclassSimpleMsg{privateString msg;privateString time;}
producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String
spring:cloud:stream:kafka:binder:##kafka的server地址brokers: 172.16.72.133:9092##如果topic不存在则创建auto-create-topics:trueauto-add-partitions:true#自动分区min-partition-count:1#最小分区##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误configuration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializer
bindings:broadcastMessage-out-0:destination: test
content-type: application/json
Consumer代码
@BeanpublicConsumer<Message<SimpleMsg>>broadcastMessage(){return msg ->{
log.info(Thread.currentThread().getName()+" Consumer1 Receive New Messages: "+ msg.getPayload().getMsg()+ msg.getPayload().getTime());};}
Consumer 配置
项目中有更详细的配置,这里为了测试用的简化版
spring:cloud:stream:function:definition: broadcastMessage
kafka:binder:brokers: 172.16.72.133:9092auto-create-topics:trueconfiguration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializer
bindings:broadcastMessage-in-0:destination: test
group: test-topic-account
content-type: application/json
Consumer 2的代码和配置
项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。
四、测试
生产者发送两个消息
两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:
friend模块消费者:
五、结语
到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。
本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git
版权归原作者 一颗知足的心 所有, 如有侵权,请联系我们删除。