1、系统架构
spring-cloud版本Hoxton.SR4spring-boot版本
2.2.6.RELEASE
java版本1.8Kafka版本2.4.0.RELEASE
2、pom引入Kafka依赖
<!--Kafka消息队列-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.0.RELEASE</version>
</dependency>
!!!!!注意!!!!!
spring-kafka与spring-boot的版本对应,详情参考网址:Spring for Apache Kafka
3、编写yml配置
#kafka配置
kafka:
producer:
bootstrap-servers: {你的Kafka服务器IP}
retries: 0
batch-size: 4096 #单位是字节
buffer-memory: 33554432 #单位是字节,默认是33554432字节即32MB
#序列化类,可以自己写然后配置在这里
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
enable-auto-commit: false #禁止自动提交offest
auto-offset-reset: latest
bootstrap-servers: {你的Kafka服务器IP}
group-id: {你的Kafka群组id}
4、编写Kafka配置类,注册消费者在这里
/**
* Kafka配置
* @date 2024年01月18日 11:30
*/
@Configuration
@EnableKafka
public class KafkaConfig {
Logger logger= LoggerFactory.getLogger(KafkaConfig.class);
/**
* 注册消费者
* @date 2024/1/18 11:31
* @return KafkaReceiver
*/
@Bean
public ZfxtMsgPendingTaskReceiver listener() {
return new ZfxtMsgPendingTaskReceiver();
}
}
5、编写生产者
/**
* 发送
* @date 2024/1/18 16:32
* @param topic Kafka标签名
* @param key 消息id
* @param data 数据
*/
public void send(String topic, String key, MsgPendingTaskKafkaDTO data) {
//发送消息
ListenableFuture<SendResult<String, MsgPendingTaskKafkaDTO>> send = kafkaTemplate.send(topic, key, data);
//异步发送,编写监听器监听结果
send.completable().whenCompleteAsync((result, ex) -> {
String s = result.toString();
if (null == ex) {
//成功则打点日志
logger.info("{}生产者发送消息成功:{}", topic, s);
} else {
//这里发生错误则存日志进数据库
}
});
}
6、编写消费者
/**
* 消费者
* @date 2024/1/18 16:36
* @param msg 消息
*/
@KafkaListener(topics = {你的消息标题,对应发送者的topic字段})
public void receive(ConsumerRecord<String, String> msg, Acknowledgment ack){
logger.info("我收到了消息");
//定义Kafka唯一消息id,避免消息重复消费
//成功的有没有
boolean success=zfxtMsgPendingTaskService.hasKey(msg.key());
//失败的有没有
boolean exception = kafkaExceptionService.hasKey(msg.key());
if(success|| exception){
//两个之中有一个就不处理了
logger.info("消息重复消费");
}else{
//没有自定义序列化要我们自己手动转json
String value = msg.value();
MsgPendingTaskKafkaDTO dto = JSON.parseObject(value, MsgPendingTaskKafkaDTO.class);
//业务处理
}
//手动提交偏移量
ack.acknowledge();
}
7、要是服务异常导致不能消费或者网络波动导致消费消息失败咋办呢?
我们可以编写消息重试机制,具体如下:
/**
* 配置消息重试机制
* @date 2024/1/19 9:44
* @param consumerFactory
* @param exceptionService kafka异常日志记录服务
* @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer<java.lang.String,java.lang.String>>
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,KafkaExceptionService exceptionService) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
//设置超时时间1.5s
factory.getContainerProperties().setPollTimeout(1500);
logger.info("执行kafka容器工厂配置");
// 消息是否或异常重试次数3次 (5000=5秒 3=重试3次)
// 注意: 没有配置配置手动提交offset,不生效, 因为没有配置手动提交时消息接受到就会自动提交,不会管程序是否异常
// DefaultErrorHandler构造参数包含两个参数:
// 一个是ConsumerRecordRecoverer消息回收处理器,用于超过重试次数执行对应消息回收
// (里面包含:{
// 1、consumerRecord记录kafaka消息的属性,topic,分区,offest偏移量;e:对应异常
// 2、BiConsumer二元消费者(一元消费者可以参考Collection的forEach函数),accept方法用于编写消费者具体操作,andThen方法用于控制消费者
// }
// 得益于这玩意我们可以不需要写handler直接通过lambda函数的方式来编写详细代码
// )
// 一个是BackOff延时执行器(interval:时间间隔;maxAttempts:retry次数)
//自定义错误消息处理器
SeekToCurrentErrorHandler defaultErrorHandler = new SeekToCurrentErrorHandler(((consumerRecord, e) -> {
logger.info("kafka消息消费出现异常,e:{}",e.getMessage());
//超过重试次数记录日志
Object key = consumerRecord.key();
if(exceptionService.hasKey(key.toString())){
logger.warn("id重复");
}else{
// TODO: 2024/1/30 记录日志
}
}), new FixedBackOff(5000, 3));
//多个可使用batchErrorHandler
//设置默认错误处理器
factory.setErrorHandler(defaultErrorHandler);
// 最后配置手动提交offset
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
就这样吧
版权归原作者 蛮荒兽人持键小子 所有, 如有侵权,请联系我们删除。