0


Springboot中使用kafka

首先说明,本人之前没用过zookeeper、kafka等,尚硅谷十几个小时的教程实在没有耐心看,现在我也不知道分区、副本之类的概念。用kafka只是听说他比RabbitMQ快,我也是昨天晚上刚使用,下文中若有讲错的地方或者我的理解与它的本质有偏差的地方请包涵。

此文背景的环境是windows,linux流程也差不多。

  • 官网下载kafka,选择Binary downloads Apache Kafka

  • 解压在D盘下或者什么地方,注意不要放在桌面等绝对路径太长的地方

  • 打开config中的 zookeeper.properties,自己选择性修改clientPort,不想改也行

  • 修改config中的 server.properties

      1.取消 **advertised.listeners** 注释,修改kafka地址与端口。如果要集群部署,broker.id不能重复,1号机是0,2号机是1这样的。
    

    2.修改 **zookeeper.connect** 为你上面zookeeper.properties中配置的地址

  • 配置好了,尝试开启kafka。

      来到bin/windows,shift右键在此处打开cmd,输入**
    
zookeeper-server-start.bat ../../config/zookeeper.properties

**,等待其启动。(注意你config的路径不要写错)

    启动完再开一个cmd,输入 **
kafka-server-start.bat ../../config/server.properties

**。

如果在此环节出现问题,请查看logs中的日志,面向csdn。

    我遇到的问题是 他显示什么什么路径太长了,问题的原因是我把他放桌面上了。
  • 新建springboot项目,pom中添加依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.28</version>
    </dependency>
</dependencies>
  • 配置application.yml,写启动类,controller,创建User类,创建consumer

application.yml

spring:
  application:
    name: kafka
  kafka:
    bootstrap-servers: localhost:9092 #这个是你server.properties中配置的
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test-consumer-group #这个去config/consumer.properties中查看和修改
                                # 不过好像不一样也不影响?
server:
  port: 8001

controller

@RestController
public class ProducerController {
​
    @Autowired
    KafkaTemplate<String, String> kafka;
​
    @RequestMapping("register")
    public String register(User user) {
        String message = JSON.toJSONString(user);
        System.out.println("接收到用户信息:" + message);
        kafka.send("register", message);
        //kafka.send(String topic, @Nullable V data) {
        return "OK";
    }
}

user

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
​
    private String id;
​
    private String name;
​
    private Integer age;
}

consumer

@Configuration
public class Consumer {
​
    @KafkaListener(topics = "register")
    public void consume(String message) {
        System.out.println("接收到消息:" + message);
        User user = JSON.parseObject(message, User.class);
        System.out.println("正在为 " + user.getName() + " 办理注册业务...");
        System.out.println("注册成功");
    }
}

此时启动springboot,然而报错了

org.springframework.context.ApplicationContextException: 
    Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';

nested exception is java.lang.IllegalStateException:
     Topic(s) [register] is/are not present and missingTopicsFatal is true

为什么呢?

请检查zookeeper和kafka的cmd上有没有他们启动失败的消息,如果有就重新启动下,记得先开zookeeper再开kafka。

然后确认你的kafka上有没有"register"这个topic,此时我是没有的,而consumer又加了 **

@KafkaListener(topics = "register")

** 注解,于是他就启动失败了。

有两种解决方式:

1.比较傻X的方式,先将@KafkaListener注释掉,启动springboot后访问localhost:8001/register,他send的时候就会自行创建topic,再取消注释重新启动就可以了。

2.cmd方式:输入 **

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic register

**

然后我们再启动,已经启动成功了。访问 localhost:8001/register?name=JamesBond&age=55

我们可以看到数据已经成功送到那里了。

然后来测试一下速度

@RequestMapping("test")
public String test() {
    System.out.println("发送开始" + System.currentTimeMillis() % 10000);
    for (int i = 0; i < 1000; i++) {
        kafka.send("test", JSON.toJSONString(new User((1289312+i)+"",
                "User" + i, (int)(Math.random() * 100), info)));
    }
    System.out.println("发送结束" + System.currentTimeMillis() % 10000);
    return "OK";
}
@KafkaListener(topics = "test")
public void test(String message) {
    System.out.println("--" + System.currentTimeMillis() % 10000 + "--");
}

console:

发送开始1267
--1384--
--1384--
...
--1715--
--1715--
发送结束1715
--1715--
--1715--
...
--1734--

对比RabbitMQ:

发送开始5692
--5766--
--5766--
...
--5973--
--5974--
发送结束5976
--5977--
--5977--
...
--6456--

kafka:

    发送结束 - 发送开始=448ms

    接收结束 - 接收开始=350ms

    整体耗时: 467ms

rabbit:

    发送结束 - 发送开始=284ms

    接收结束 - 接收开始=690ms

    整体耗时: 764ms

OK既然我会用了 我就去学一下kafka基本知识了

标签: spring boot java kafka

本文转载自: https://blog.csdn.net/m0_52640724/article/details/123683518
版权归原作者 欧内的手好汗 所有, 如有侵权,请联系我们删除。

“Springboot中使用kafka”的评论:

还没有评论