项目场景:
互联网项目中经常用到MQ,由于本地项目开发连接测试环境kafka很不方便,所有在本机搭建一个kafka,方便开发测试。
前置准备
提示:Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper
下载Zookeeper地址:https://zookeeper.apache.org/releases.html
下载kafka地址:http://kafka.apache.org/downloads.html
配置Zookeeper
1.将下载好的文件解压到本地,如图:
复制zoo_sample.cfg文件,并将新复制的文件命名为zoo.cfg,修改文件zoo.cfg内容如下:
dataDir=F:\mq\apache-zookeeper-3.6.3\dataDir
dataLogDir=F:\mq\apache-zookeeper-3.6.3\dataLogDir
2.配置Window环境变量
3.启动Zookeeper
进入Zookeeper安装目录,cmd 输入命令zkserver,如图
启动成功!!
配置kafka
1.解压下载文件到本地
进入F:\mq\kafka_2.13-2.8.0\config文件内,修改文件server.properties
log.dirs=F:\mq\kafka_2.13-2.8.0\logs
2.启动kafka服务
在安装目录cmd输入命令:
.\bin\windows\kafka-server-start.bat .\config\server.properties
无报错则正常启动,本地启动窗口不要关闭。
3.创建topic名称为syn_user的命令:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syn_user
4.查看创建的topic
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
本地kafka环境测试:
启动生产者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic syn_user
启动消防者监听消息
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic syn_user
springboot 集成:
1.引入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.yml配置
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
acks: -1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
enable-auto-commit: false
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: test-consumer-group
listener:
ack-mode: MANUAL
3.创建消息生产者
@RestController
@Api(value = "mq消息", tags = "Fh-mq消息")
@RequestMapping("/wkafka")
public class ProducerController {
private static final String KAFKA_TOPIC_NAME = "wlhydemo";
@Autowired
KafkaTemplate<String, String> kafka;
@PostMapping("/send")
public String register(@RequestBody User user) {
try {
String message = JSONUtil.toJsonStr(user);
System.out.println("注册用户信息:" + message);
kafka.send(KAFKA_TOPIC_NAME, message);
return "OK";
} catch (Exception e) {
e.printStackTrace();
}
return "消息同步失败";
}
}
4.监听topic消息类
@Slf4j
@Component
public class KaUserConsumer {
@KafkaListener(topics = "wlhydemo")
public void listenFlowStart(@Payload String businessStr,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offset )
{
try{
// 模拟业务处理...
log.info("当前消费分区:{}", partition);
log.info("当前消费位置:{}", offset);
log.info("接收到的消息:{}", businessStr);
User user= JSONUtil.toBean(businessStr, User.class);
user.getNickName();
} catch (Exception e) {
e.printStackTrace();
}
}
}
版权归原作者 Mr.kuan 所有, 如有侵权,请联系我们删除。