0


Window下搭建kafka运行环境

项目场景:

互联网项目中经常用到MQ,由于本地项目开发连接测试环境kafka很不方便,所有在本机搭建一个kafka,方便开发测试。


前置准备

提示:Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper

下载Zookeeper地址:https://zookeeper.apache.org/releases.html

下载kafka地址:http://kafka.apache.org/downloads.html


配置Zookeeper

1.将下载好的文件解压到本地,如图:

复制zoo_sample.cfg文件,并将新复制的文件命名为zoo.cfg,修改文件zoo.cfg内容如下:

dataDir=F:\mq\apache-zookeeper-3.6.3\dataDir
dataLogDir=F:\mq\apache-zookeeper-3.6.3\dataLogDir

2.配置Window环境变量

3.启动Zookeeper

进入Zookeeper安装目录,cmd 输入命令zkserver,如图

启动成功!!


配置kafka

1.解压下载文件到本地

进入F:\mq\kafka_2.13-2.8.0\config文件内,修改文件server.properties

log.dirs=F:\mq\kafka_2.13-2.8.0\logs

2.启动kafka服务

在安装目录cmd输入命令:

  .\bin\windows\kafka-server-start.bat .\config\server.properties

无报错则正常启动,本地启动窗口不要关闭。

3.创建topic名称为syn_user的命令:

 .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syn_user

4.查看创建的topic

 .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

本地kafka环境测试:

启动生产者

 .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic syn_user

启动消防者监听消息

 .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic syn_user


springboot 集成:

1.引入pom依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.yml配置

  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      acks: -1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      enable-auto-commit: false
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: test-consumer-group
      listener:
        ack-mode: MANUAL

3.创建消息生产者

@RestController
@Api(value = "mq消息", tags = "Fh-mq消息")
@RequestMapping("/wkafka")
public class ProducerController {

    private static final String  KAFKA_TOPIC_NAME = "wlhydemo";
    @Autowired
    KafkaTemplate<String, String> kafka;

    @PostMapping("/send")
    public String register(@RequestBody User user) {
        try {
            String message = JSONUtil.toJsonStr(user);
            System.out.println("注册用户信息:" + message);
            kafka.send(KAFKA_TOPIC_NAME, message);
            return "OK";
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "消息同步失败";
    }

}

4.监听topic消息类

@Slf4j
@Component
public class KaUserConsumer {
    @KafkaListener(topics = "wlhydemo")
    public void listenFlowStart(@Payload String businessStr,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                @Header(KafkaHeaders.OFFSET) int offset )
    {
        try{
            // 模拟业务处理...
            log.info("当前消费分区:{}", partition);
            log.info("当前消费位置:{}", offset);
            log.info("接收到的消息:{}", businessStr);
            User user= JSONUtil.toBean(businessStr, User.class);
            user.getNickName();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
标签: kafka java 分布式

本文转载自: https://blog.csdn.net/u013761772/article/details/130244266
版权归原作者 Mr.kuan 所有, 如有侵权,请联系我们删除。

“Window下搭建kafka运行环境”的评论:

还没有评论