0


spring集成kafka并对消息进行监听

spring集成kafka

文章目录

需要依赖zookeeper,需提前启动

在server.properties文件中配置kafka连接zookeeper相关信息

############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

在zookeeper.properties中配置zookeeper所需配置

# 数据文件保存地址
dataDir=/tmp/zookeeper
# 客户端端口
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# 设置此功能端口将不在冲突
admin.enableServer=false
# admin.serverPort=8080

kafka本地安装启动

windows下载kafka二进制包到本机:http://kafka.apache.org/downloads
2、在config下面的server.properties文件,修改:
listeners=PLAINTEXT://localhost:9092
log.dirs=F:\kafka_2.13-2.5.0\logs
3、在bin同级目录下打开shell窗口,启动kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
4、创建主题 查看可用主题
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
5、删除指定topic
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic topic_kedacom_icms_alarm_jy_3725
5.1、如果出现临时存储的topic需要到zookeeper删除指定的topic
#查看存储的topicls/brokers/topics
#删除指定的topic
rmr /brokers/topics/topicName
6、另起窗口,开启指定topic
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_kedacom_icms_alarm_jy_3725
7、另起窗口、开启生产端
.\bin\windows\kafka-console-producer.bat --broker-list 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_jy_3725
8、另起窗口,开启消费端
chcp 65001
.\bin\windows\kafka-console-consumer.bat --bootstrap-server 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_sj_3725 --from-beginning
如果遇到文本过长 指令识别错误,是因为存放目录过长不规范引起

pom文件

#在选择版本,高版本会提示缺少anntnationprocess...
   <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.8.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-api</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>commons-httpclient</groupId>
      <artifactId>commons-httpclient</artifactId>
      <version>3.1</version>
    </dependency>
  </dependencies>

生产配置

/**
 * @Auther: lyp
 * @Date: 2021/11/22 15:46
 */@Configuration@EnableKafkapublicclassKafkaProducerConfig{@Value("${bootstrap.servers}")privateString bootstrapServers;publicKafkaProducerConfig(){System.out.println("kafka--------------------------------生产配置");}/**
     * 创建生产值消息工厂
     */@BeanpublicProducerFactory<Integer,String>producerFactory(){returnnewDefaultKafkaProducerFactory(producerProperties());}/**
     * 生产基本配置
     */@BeanpublicMap<String,Object>producerProperties(){Map<String,Object> props =newHashMap<String,Object>();//设置kafka访问地址
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//消息转化
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//重试次数
         props.put(ProducerConfig.RETRIES_CONFIG,1);//分批处理内存设置
         props.put(ProducerConfig.BATCH_SIZE_CONFIG,1048576);
         props.put(ProducerConfig.LINGER_MS_CONFIG,1);//使用内存配置
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432L);//确认标志符使用配置
         props.put(ProducerConfig.ACKS_CONFIG,"all");return props;}@BeanpublicKafkaTemplate<Integer,String>kafkaTemplate(){KafkaTemplate kafkaTemplate =newKafkaTemplate<Integer,String>(producerFactory(),true);
         kafkaTemplate.setDefaultTopic(KafkaSendEnum.ALARM_WARN_PUSH.getTopic());return kafkaTemplate;}}

消费者配置

packagecom.huating.jfp.msg.api.kafka.config;importcom.huating.jfp.msg.api.kafka.construct.KafkaConsumerEnum;importcom.huating.jfp.msg.api.kafka.listener.KafkaConsumerListener;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importjava.util.HashMap;importjava.util.Map;/**
 * @author lyp
 * @ClassName KafkaConsumerConfig
 * @description: 消费者配置
 * @datetime 2022年 07月 20日 9:15
 */@Configuration@EnableKafkapublicclassKafkaConsumerConfig{@Value("${bootstrap.servers}")privateString bootstrapServers;publicKafkaConsumerConfig(){System.out.println("kafka消费者配置加载...");}publicMap<String,Object>consumerProperties(){Map<String,Object> props =newHashMap<String,Object>();//Kafka服务地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//消费组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,KafkaConsumerEnum.SD_SJ.getGroupId());//关闭自动提交位移
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//设置间隔时间,默认5000ms
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);//Key反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//Value反序列化
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区        下的数据//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的            offset,则抛出异常
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");return props;}publicConsumerFactory<String,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<String,String>(consumerProperties());}@BeanpublicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);return factory;}@BeanpublicKafkaConsumerListenerkafkaConsumerListener(){returnnewKafkaConsumerListener();}}

创建topic工具类

/**
 * @author lyp
 */publicclassKafkaTopicUtil{privatestaticfinalLogger logger =LoggerFactory.getLogger(KafkaTopicUtil.class);/**
     * 功能描述:创建topic,并返回创建结果
     * @param: topicName
     * @return: boolean
     * @auther: lyp
     * @date: 2021/11/12 16:06
     */publicstaticbooleancreateTopics(String bootstrapServers,String topicName,int partitions,short replication){boolean res =false;try{Properties properties =newProperties();
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");AdminClient adminClient =KafkaAdminClient.create(properties);NewTopic newTopic =newNewTopic(topicName, partitions, replication);
            adminClient.createTopics(Arrays.asList(newTopic));
            logger.info("创建Topic:"+topicName+"成功!");
            res =true;}catch(Exception e){
            e.printStackTrace();
            logger.info("创建异常!");}return res;}/**
     * 功能描述:获取当前kafka所存在的topic列表
     * @return: set
     * @auther: lyp
     * @date: 2021/11/12 16:07
     */publicstaticSet<String>getTopics(String bootstrapServers){Set<String> nameSet =newHashSet<>();try{Properties properties =newProperties();
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);AdminClient adminClient =KafkaAdminClient.create(properties);ListTopicsResult listTopicsResult = adminClient.listTopics();KafkaFuture<Set<String>> names = listTopicsResult.names();
            nameSet = names.get();}catch(Exception e){
            e.printStackTrace();}return nameSet;}}

生产业务

publicinterfaceKafkaProduceService{/**设备报警消息发送*/booleansendWarnMessage(DeviceWarnInfo deviceWarnInfo);}
/**
 * @author lyp
 */@Service("kafkaProducerService")publicclassKafkaProducerServiceImplimplementsKafkaProduceService{privatestaticfinalLogger logger =LoggerFactory.getLogger(KafkaProduceService.class);@Value("${bootstrap.servers}")privateString bootstrapServers;@Value("${topic.name}")privateString topicName;@Value("${srcUnit.code}")privateString srcUnitCode;@Value("${srcUnit.name}")privateString srcUnitName;@OverridepublicbooleansendWarnMessage(DeviceWarnInfo deviceWarnInfo){boolean res =false;Map<String,Object> reportData =newHashMap<>();
        reportData.put("command","reportAlarm");
        deviceWarnInfo.setSrcUnitCode(srcUnitCode);
        deviceWarnInfo.setSrcUnitName(srcUnitName);
        reportData.put("data",deviceWarnInfo);//判断是否存在当前主题Set<String> topics =KafkaTopicUtil.getTopics(bootstrapServers);if(!topics.contains(KafkaSendEnum.ALARM_WARN_PUSH.getTopic())){if(!KafkaTopicUtil.createTopics(bootstrapServers,topicName,1,(short)1)){
                logger.info("topic创建失败,消息发送不成功!");return res;}}KafkaTemplate kafkaTemplate =SpringContextUtil.getBean("kafkaTemplate");ListenableFuture send = kafkaTemplate.sendDefault(topicName,JSONArray.toJSONString(reportData));
        send.addCallback(newListenableFutureCallback(){@OverridepublicvoidonFailure(Throwable ex){
                logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause());System.out.println("发送失败!");}@OverridepublicvoidonSuccess(Object result){
                logger.info("消息发送成功"+result.toString());System.out.println("发送成功!");}});return res;}}

消费业务

消息接收类
packagecom.huating.jfp.msg.api.kafka.entity;/**
 * @author lyp
 * @ClassName MesBody
 * @description: 消息实体
 * @datetime 2022年 07月 21日 14:48
 */@DatapublicclassMesBody{//类型标记字段privateString command;//消息实体字段privateString data;}
监听类
packagecom.huating.jfp.msg.api.kafka.listener;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importcom.alibaba.fastjson.JSONObject;importcom.huating.jfp.msg.api.kafka.construct.KafkaMesType;importcom.huating.jfp.msg.api.kafka.construct.KafkaTopics;importcom.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;importcom.huating.jfp.msg.api.kafka.entity.InspectorIssue;importcom.huating.jfp.msg.api.kafka.entity.MesBody;importcom.huating.jfp.msg.api.kafka.entity.Notice;/**
 * @author lyp
 * @ClassName KafkaConsumerListener
 * @description: 主题监听
 * @datetime 2022年 07月 20日 9:27
 */publicclassKafkaConsumerListener{privatestaticfinalLogger logger =LoggerFactory.getLogger(KafkaConsumerListener.class);@AutowiredprivateKafkaConsumerService consumerService;/**
     * 功能描述: 监听指定topic,多个使用,
     * groupId:分组id
     * topics:监听当前topic数组
     * topic:监听单个topic
     */@KafkaListener(groupId ="${group.id}",topics ="#{'${consumer.topics}'.split(',')}",containerFactory ="")publicvoidlistener(ConsumerRecord<String,String> consumerRecord){
        logger.info("开始消费"+KafkaTopics.SD_JY_DUTY_TOPIC.getTopicName()+"的消息{}", consumerRecord.value());MesBody mesBody =JSONObject.parseObject(consumerRecord.value(),MesBody.class);
        logger.error("kafka监听-当前消息类型:"+mesBody.getCommand());//督查督办if(mesBody.getCommand().equals(KafkaMesType.SD_INSPECTOR_ISSUE.getMesCode())|| mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_DISPOSE.getMesCode())|| mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_RES.getMesCode())){
            logger.error("督查督办监听消息处理开始----->----->");InspectorIssue inspectorIssue =JSONObject.parseObject(mesBody.getData(),InspectorIssue.class);
            consumerService.inspectorListener(inspectorIssue);}//通知通报if(mesBody.getCommand().equals(KafkaMesType.SD_NOTICE_ISSUE.getMesCode())){
            logger.error("通知通报开始监听");Notice notice =JSONObject.parseObject(mesBody.getData(),Notice.class);
            consumerService.noticeListener(notice);}}}
业务处理
packagecom.huating.jfp.msg.api.kafka.consumer.service;importcom.huating.jfp.msg.api.kafka.entity.InspectorIssue;importcom.huating.jfp.msg.api.kafka.entity.Notice;/**
 * @author lyp
 */publicinterfaceKafkaConsumerService{/**
     * 功能描述: 督查下发 督查办结监听处理
     *
     * @param inspectorIssue
     */voidinspectorListener(InspectorIssue inspectorIssue);/**
     * 功能描述: 通知通报下发监听
     *
     * @param notice
     */voidnoticeListener(Notice notice);}
packagecom.huating.jfp.msg.api.kafka.consumer.service.impl;importcn.hutool.core.date.DateUtil;importcn.hutool.core.util.StrUtil;importcom.huating.jfp.common.dao.MsgConfigureDao;importcom.huating.jfp.common.dao.MsgDao;importcom.huating.jfp.common.entity.Msg;importcom.huating.jfp.common.entity.MsgConfigure;importcom.huating.jfp.common.entity.MsgReceive;importcom.huating.jfp.common.service.MsgReceiveService;importcom.huating.jfp.core.base.ViewPublicRewrite;importcom.huating.jfp.msg.api.http.servcie.HttpRequestService;importcom.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;importcom.huating.jfp.msg.api.kafka.dao.InspectorEventMapper;importcom.huating.jfp.msg.api.kafka.dao.NoticeMapper;importcom.huating.jfp.msg.api.kafka.entity.*;importcom.huating.jfp.msg.api.kafka.producer.service.KafkaProducerService;importcom.huating.jfp.util.StringUtil;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.util.ArrayList;importjava.util.Date;importjava.util.List;/**
 * @author lyp
 * @ClassName KafkaConsumerServiceImpl
 * @description: 消费实现
 * @datetime 2022年 07月 20日 9:13
 */@ServicepublicclassKafkaConsumerServiceImplimplementsKafkaConsumerService{privatestaticfinalLogger logger =LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);privatestaticfinalString SRC_UNIT_CODE ="gaol_code";privatestaticfinalString SRC_UNIT_NAME ="gaol_name";@AutowiredprivateInspectorEventMapper inspectorEventMapper;@AutowiredprivateHttpRequestService httpRequestService;@AutowiredprivateNoticeMapper noticeMapper;@AutowiredprivateMsgDao msgMapper;@AutowiredprivateMsgConfigureDao msgConfigureMapper;@AutowiredprivateMsgReceiveService msgReceiveService;@AutowiredprivateViewPublicRewrite vp;@OverridepublicvoidinspectorListener(InspectorIssue inspectorIssue){if(!StrUtil.isEmpty(inspectorIssue.getUuid())){if(!StrUtil.isEmpty(inspectorIssue.getDubanTime())){
                logger.error("督办下发处理");InspectorEventDispose inspectorEventDispose =newInspectorEventDispose();//督查督办String uuid =StringUtil.getUUID();
                inspectorEventDispose.setIedUuid(uuid);
                inspectorEventDispose.setIedIeUuid(inspectorIssue.getUuid());
                inspectorEventDispose.setIedExpireTime(inspectorIssue.getDubanTime());
                inspectorEventDispose.setIedContent(inspectorIssue.getContent());//督办下发持久化
                inspectorEventMapper.insertDispose(inspectorEventDispose);
                logger.error("督办下发数据新增完成");//督办文件持久化List<FileEntity> files = inspectorIssue.getFiles();List<InspectorFile> fileList =newArrayList<>();downloadFile(files, fileList, uuid);
                inspectorEventMapper.insertFiles(fileList);
                logger.error("督办下发完成");}elseif(!StrUtil.isEmpty(inspectorIssue.getSrcUnitCode())){
                logger.error("督查下发处理");InspectorEvent inspectorEvent =newInspectorEvent();//督查下发
                inspectorEvent.setIeUuid(inspectorIssue.getUuid());
                inspectorEvent.setIeAreaCode(vp.getBusinessValue("gaol_code"));
                inspectorEvent.setIeEventType(inspectorIssue.getType());
                inspectorEvent.setIeDescribe(inspectorIssue.getContent());
                inspectorEvent.setIeGrabTime(inspectorIssue.getPublishTime());
                inspectorEvent.setIeExpireTime(inspectorIssue.getQxTime());
                inspectorEvent.setIeCusNunmber(vp.getBusinessValue("base_cus"));
                inspectorEvent.setIeNature(inspectorIssue.getNature());
                inspectorEvent.setIeIsSj(0);//督查下发持久化
                inspectorEventMapper.insertSynData(inspectorEvent);
                logger.error("督查下发数据新增成功");//督查文件持久化List<FileEntity> files = inspectorIssue.getFiles();List<InspectorFile> fileList =newArrayList<>();downloadFile(files, fileList, inspectorIssue.getUuid());
                inspectorEventMapper.insertFiles(fileList);
                logger.error("督查文件数据新增成功");
                logger.error("督查下发完成");}else{//督查办结if(inspectorEventMapper.searchIsSj(inspectorIssue.getUuid())>0){//修改督查状态为办结
                    inspectorEventMapper.updateState("3", inspectorIssue.getUuid());
                    logger.error("督查办结完成");}}}}@OverridepublicvoidnoticeListener(Notice notice){
        logger.error("通知通报下发开始处理");//通知通报持久化
        noticeMapper.insertData(notice);Msg msg =newMsg();String uuid =StringUtil.getUUID();
        msg.setMUuid(uuid);MsgConfigure msgConfigure =newMsgConfigure();
        msgConfigure.setMcCode("NOTIC_ISSUE");MsgConfigure config = msgConfigureMapper.selectByData(msgConfigure).get(0);

        msg.setMcUuid(config.getMcUuid());
        msg.setMcMsglevel(config.getMcMsglevel());
        msg.setMStatus(Byte.parseByte(notice.getFeedback()==0?"1":"0"));
        msg.setMParam(notice.getUuid());
        msg.setMContent(notice.getTitle());
        msg.setCreateTime(newDate());if(notice.getFeedback()==0){
            msg.setMHandleTime(newDate());
            msg.setMHandleUser("当前通知通报无需处置");}
        msgMapper.insertMsg(msg);MsgReceive msgReceive =newMsgReceive();
        msgReceive.setMrUuid(StringUtil.getUUID());
        msgReceive.setmUuid(uuid);
        msgReceiveService.insertMsgReceive(msgReceive);//文件持久化List<FileEntity> files = notice.getFiles();noticeDownloadFile(files, notice.getUuid());
        noticeMapper.insertFiles(files);}privatevoiddownloadFile(List<FileEntity> files,List<InspectorFile> fileList,String uuid){
        logger.error("文件下载开始");if(!files.isEmpty()){for(FileEntity file : files){InspectorFile inspectorFile =newInspectorFile();String fileName = file.getFileName();
                logger.error(fileName);
                inspectorFile.setIfFileName(fileName);String last = fileName.substring(fileName.lastIndexOf("."));if(last.equals(".jpg")|| last.equals(".JPG")|| last.equals(".png")|| last.equals(".gif")|| last.equals(".bmp")){
                    inspectorFile.setIfFileType(1);}else{
                    inspectorFile.setIfFileType(2);}
                inspectorFile.setIfSourceType(1);
                inspectorFile.setIfIeUuid(uuid);//需要确定省局的其他类型文件详情
                inspectorFile.setIfPath(file.getFileName());String fileId = file.getFileId();//文件下载String token = httpRequestService.sendPostMessage();boolean res = httpRequestService.downloadFile(fileId, token, vp.getCusBusinessValue("duty_file_disk_mapping_path","1000")+"/dutyUpLoad/");if(res){
                    fileList.add(inspectorFile);}}}}privatevoidnoticeDownloadFile(List<FileEntity> files,String uuid){
        files.stream().forEach((file)->{
            file.setParentId(uuid);String token = httpRequestService.sendPostMessage();
            httpRequestService.downloadFile(file.getFileId(), token, vp.getBusinessValue("notice_file_disk_mapping_path")+"/noticeUpLoad/");});}publicbooleancheckSj(){return vp.getBusinessValue("is_sj")!=null&&Boolean.parseBoolean(vp.getBusinessValue("is_sj"));}}

异步 同步 ONEWAY

kafka消息发送方式有同步、异步和ONEWAY三种方式,producer.type参数指定同步或者异步,request.require.acks指定ONEWAY。

producer.type=sync默认同步

设置异步需配套配置
PropertyDefaultDescriptionqueue.buffering.max.ms5000启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。queue.buffering.max.messages10000启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。queue.enqueue.timeout.ms-1当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。batch.num.messages200启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)

以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。

在代码中如果需要同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

kafkaTemplate.send().get("key",value);

异步发送只需要在发送成功获取消息是否成功即可:

ListenableFuture future = kafkaTemplate.send();
future.addCallback(newListenableFutureCallback(){@OverridepublicvoidonFailure(Throwable ex){
                logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause());}@OverridepublicvoidonSuccess(Object result){
                logger.info("消息发送成功"+result.toString());}});

消息可靠性

producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:

  • 如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
  • 若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。
  • 若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
标签: kafka java 分布式

本文转载自: https://blog.csdn.net/qq_44960057/article/details/129404786
版权归原作者 帮我带瓶可乐 所有, 如有侵权,请联系我们删除。

“spring集成kafka并对消息进行监听”的评论:

还没有评论