Java轻松使用Kafka生产者,消费者
一、环境说明
- 项目中需要下面的依赖:(版本自定义)
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.2</version>
</dependency>
2.yml配置文件设置
kafka:
bootstrap-servers: ip:端口
jaas:
enabled: false
listener:
type: single
concurrency: 3
consumer:
# key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
group-id: group-id数据
auto-offset-reset: latest
enable-auto-commit: false
max-poll-records: 100
# kafka topic
task:
execute:
topic: topic名称
二、生产者
1.简单生产者的书写:
@Component
public class SendKafkaUtil {
@Autowired
KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${spring.task.execute.topic}")
private String topic;
public void sendMessageKafka() {
kafkaTemplate.send(topic, "{json数据}");
}
}
三、消费者
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
1.简单消费者的书写:
@KafkaListener(topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.consumer.group-id}")
public void processMessage(List<ConsumerRecord<String, String>> records){
logger.info("kafka消费消息数量:" + records.size());
}
2.消费者批量消费的书写(批量控制:max-poll-records: 100)
@Bean
public KafkaListenerContainerFactory<?> batchFactory(KafkaProperties properties) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProperties));
factory.setBatchListener(true); // 开启批量监听
return factory;
}
}
/**
* 消费者1
* 批处理统一方法
*
* @param records
*/
@KafkaListener(topics = {"${spring.task.execute.topic}"},containerFactory = "batchFactory",topicPartitions = {
@TopicPartition(partitions = {"0"}, topic = "${spring.task.execute.topic}") })
public void consumer1(List<ConsumerRecord<String, Object>> records) throws IOException {
log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id1 records size " + records.size());
// todo数据逻辑处理
}
/**
* 消费者2
* 批处理统一方法
*
* @param records
*/
@KafkaListener(topics = {"${spring.task.execute.topic}"}, containerFactory = "batchFactory",topicPartitions = {
@TopicPartition(partitions = {"1"}, topic = "${spring.task.execute.topic}") })
public void consumer2(List<ConsumerRecord<String, Object>> records) throws IOException {
log.info("Id2 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id2 records size " + records.size());
// todo数据逻辑处理
}
注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费
本文转载自: https://blog.csdn.net/qq_38946379/article/details/127441511
版权归原作者 束曙 所有, 如有侵权,请联系我们删除。
版权归原作者 束曙 所有, 如有侵权,请联系我们删除。