0


kafka入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息
  • 生产者发送消息,多个消费者都可以接收到消息

(1)创建kafka-demo项目,导入依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>3.4.0</version>
  5. </dependency>

(2)生产者发送消息

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.util.Properties;
  5. /**
  6. * 生产者
  7. */
  8. public class ProducerQuickStart {
  9. public static void main(String[] args) {
  10. //1.kafka的配置信息
  11. Properties properties = new Properties();
  12. //kafka的连接地址
  13. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.140.100:9092");
  14. //发送失败,失败的重试次数
  15. properties.put(ProducerConfig.RETRIES_CONFIG,5);
  16. //消息key的序列化器
  17. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  18. //消息value的序列化器
  19. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  20. //2.生产者对象
  21. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
  22. //封装发送的消息
  23. ProducerRecord<String,String> record = new ProducerRecord<String, String>("green-topic","100001","hello kafka");
  24. //3.发送消息
  25. producer.send(record);
  26. //4.关闭消息通道,必须关闭,否则消息发送不成功
  27. producer.close();
  28. }
  29. }

(3)消费者接收消息

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.time.Duration;
  6. import java.util.Collections;
  7. import java.util.Properties;
  8. /**
  9. * 消费者
  10. */
  11. public class ConsumerQuickStart {
  12. public static void main(String[] args) {
  13. //1.添加kafka的配置信息
  14. Properties properties = new Properties();
  15. //kafka的连接地址
  16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.100:9092");
  17. //消费者组
  18. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
  19. //消息的反序列化器
  20. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  21. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  22. //2.消费者对象
  23. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
  24. //3.订阅主题
  25. consumer.subscribe(Collections.singletonList("green-topic"));
  26. //当前线程一直处于监听状态 每一秒拉取一次
  27. while (true) {
  28. //4.获取消息
  29. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
  30. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  31. System.out.println(consumerRecord.key());
  32. System.out.println(consumerRecord.value());
  33. }
  34. }
  35. }
  36. }

SpringBoot集成kafka

1.导入spring-kafka依赖信息

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <!-- kafkfa -->
  7. <dependency>
  8. <groupId>org.springframework.kafka</groupId>
  9. <artifactId>spring-kafka</artifactId>
  10. <exclusions>
  11. <exclusion>
  12. <groupId>org.apache.kafka</groupId>
  13. <artifactId>kafka-clients</artifactId>
  14. </exclusion>
  15. </exclusions>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.kafka</groupId>
  19. <artifactId>kafka-clients</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>com.alibaba</groupId>
  23. <artifactId>fastjson</artifactId>
  24. </dependency>
  25. </dependencies>

2.在resources下创建文件application.yml

  1. server:
  2. port: 9991
  3. spring:
  4. application:
  5. name: kafka-demo
  6. kafka:
  7. bootstrap-servers: 192.168.140.100:9092
  8. producer:
  9. retries: 10
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. consumer:
  13. group-id: ${spring.application.name}-test
  14. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.消息生产者

  1. import com.alibaba.fastjson.JSON;
  2. import com.heima.kafka.pojo.User;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. public class HelloController {
  9. @Autowired
  10. private KafkaTemplate<String,String> kafkaTemplate;
  11. @GetMapping("/hello")
  12. public String hello(){
  13. kafkaTemplate.send("green-topic","hello kafka...");
  14. return "ok";
  15. }
  16. //传递的消息为对象
  17. @GetMapping("/hello2")
  18. public String hello2(){
  19. User user = new User();
  20. user.setUsername("小明");
  21. user.setAge(18);
  22. kafkaTemplate.send("green-topic2", JSON.toJSONString(user));
  23. return "ok2";
  24. }
  25. }

4.消息消费者

  1. import com.alibaba.fastjson.JSON;
  2. import com.heima.kafka.pojo.User;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.util.StringUtils;
  6. @Component
  7. public class HelloListener {
  8. @KafkaListener(topics = "green-topic")
  9. public void onMessage(String message){
  10. if(!StringUtils.isEmpty(message)){
  11. System.out.println(message);
  12. }
  13. }
  14. @KafkaListener(topics = "green-topic2")
  15. public void onMessage2(String message){
  16. if(!StringUtils.isEmpty(message)){
  17. System.out.println(JSON.parseObject(message, User.class));
  18. }
  19. }
  20. }
标签: kafka 分布式

本文转载自: https://blog.csdn.net/weixin_52270382/article/details/135980823
版权归原作者 养一只摆烂猫. 所有, 如有侵权,请联系我们删除。

“kafka入门”的评论:

还没有评论