0


kafka实现消息接受和发送

1、首先引入依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>spring-kafka-test</artifactId>
  8. <scope>test</scope>
  9. </dependency>

2、设置环境

  1. spring:
  2. kafka:
  3. # 配置连接到服务端集群的配置项 ip:port,ip:port
  4. bootstrap-servers: 192.168.211.136:9092
  5. consumer:
  6. # auto-commit-interval: 100
  7. auto-offset-reset: earliest
  8. # enable-auto-commit: false # 进行手动提交 默认是自动提交
  9. # enable-auto-commit: true
  10. group-id: test-consumer-group
  11. # 默认值即为字符串
  12. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  13. # 默认值即为字符串
  14. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15. producer:
  16. batch-size: 16384
  17. buffer-memory: 33554432
  18. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  19. retries: 0
  20. value-serializer: org.apache.kafka.common.serialization.StringSerializer

3、启动类实现

  1. package com.jjw;
  2. import com.jjw.producer.Producer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.SpringApplication;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  8. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  9. import org.springframework.kafka.core.ConsumerFactory;
  10. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  11. import org.springframework.kafka.listener.ContainerProperties;
  12. import org.springframework.web.bind.annotation.GetMapping;
  13. import org.springframework.web.bind.annotation.PathVariable;
  14. import org.springframework.web.bind.annotation.RestController;
  15. @SpringBootApplication
  16. public class KafkaClientApplicaton {
  17. public static void main(String[] args) {
  18. SpringApplication.run(KafkaClientApplicaton.class, args);
  19. }
  20. @RestController
  21. class TestController {
  22. @Autowired
  23. private Producer producer;
  24. /**
  25. * @return
  26. * @throws Exception
  27. */
  28. @GetMapping("/send/{message}")
  29. public String sendM1(@PathVariable(name="message") String message) throws Exception {
  30. producer.send(message);
  31. return "ok";
  32. }
  33. }
  34. //1,关闭自动,2.设置手动提交模式 3 在消费者端 进行 确认
  35. @Bean
  36. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
  37. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  38. factory.setConsumerFactory(consumerFactory);
  39. //配置手动提交offset
  40. factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL));
  41. return factory;
  42. }
  43. }

4、生产者类实现

  1. package com.jjw.producer;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class Producer {
  7. @Autowired
  8. private KafkaTemplate kafkaTemplate;
  9. public void send(String message) throws Exception {
  10. //设置主题
  11. //设置消息内容
  12. kafkaTemplate.send("jjw", message);
  13. }
  14. }

5、消费者类实现

  1. package com.jjw.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.kafka.support.Acknowledgment;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class KafkaConsumer {
  9. @KafkaListener(topics = {"jjw"})
  10. public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) throws IOException {
  11. String value = record.value();
  12. System.out.println("接收到的消息:"+value);
  13. System.out.println("offset"+record.offset());
  14. //System.out.println("key"+record.key());
  15. // 手动提交
  16. //ack.acknowledge();
  17. }
  18. }

6、运行启动类并进行测试即在浏览器中输入如下内容

在这里插入图片描述

7、消息接收方收到的内容

  1. 接收到的消息:jjwjjjwjjw
  2. offset46

需要注意的是这个是在服务器上首先把kafka搭建好了的方式,可参考如下方式搭建

1、下载镜像

  1. docker pull wurstmeister/zookeeper
  2. docker pull wurstmeister/kafka

2、创建容器

  1. docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
  2. docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.211.136:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.211.136:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/jjw_zyfx/article/details/131498561
版权归原作者 jjw_zyfx 所有, 如有侵权,请联系我们删除。

“kafka实现消息接受和发送”的评论:

还没有评论