0


Springboot + netty + rabbitmq + myBatis

目录

0.为什么用消息队列

  1. 流量消峰
  2. 应用解耦
  3. 异步确认

1.代码文件创建结构

在这里插入图片描述

2.pom.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.test</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>demo</name><description>Demo project for Spring Boot</description><packaging>war</packaging><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.86.Final</version><!-- 根据需要选择版本 --></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.5.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><skip>true</skip></configuration></plugin></plugins></build></project>

3.三个配置文件开发和生产环境

在这里插入图片描述
文件一 application.properties

  1. #开发环境
  2. spring.profiles.active=dev
  3. #生产环境
  4. #spring.profiles.active=prod

文件二 application-dev.properties 开发环境

  1. spring.application.name=demo
  2. server.servlet.context-path=/demo1
  3. server.port=1001
  4. #spring.main.allow-circular-references=true
  5. spring.rabbitmq.host=实际ip地址
  6. spring.rabbitmq.port=5672
  7. spring.rabbitmq.username=root
  8. spring.rabbitmq.password=root
  9. spring.rabbitmq.virtual-host=/
  10. ##创建单线程监听容器 本项目目前用的是单线程 这里预期值2000 需根据实际情况调整
  11. spring.rabbitmq.listener.simple.prefetch=2000
  12. ##创建多线程监听容器
  13. #spring.rabbitmq.listener.direct.prefetch=2000
  14. #spring.rabbitmq.listener.simple.acknowledge-mode=auto
  15. # application.properties 示例
  16. spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
  17. #开启消息确认机制
  18. spring.rabbitmq.publisher-confirm-type=correlated
  19. spring.rabbitmq.publisher-returns=true
  20. netty.server.port=1002
  21. netty.server.bossThreads=1
  22. netty.server.workerThreads=1
  23. server.max-http-header-size=655360
  24. mybatis.mapper-locations=classpath:mapper/*Mapper.xml
  25. mybatis.type-aliases-package=com.mt.entity
  26. spring.mvc.pathmatch.matching-strategy=ant_path_matcher
  27. logging.level.com.ysd.mapper=info
  28. logging.file.name=demo.log
  29. logging.level.com.mt.mapper=info
  30. #mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
  31. mybatis.configuration.map-underscore-to-camel-case=true
  32. pagehelper.helper-dialect=mysql
  33. pagehelper.reasonable=true
  34. spring.main.allow-circular-references=true
  35. spring.jackson.default-property-inclusion=non_null
  36. #<!--=====================数据库1 ====================-->
  37. spring.datasource.dynamic.英文数据库名称1.url=jdbc:mysql://ip地址:3306/英文数据库名称?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false
  38. spring.datasource.dynamic.datasource.英文数据库名称1.username=root
  39. spring.datasource.dynamic.datasource.英文数据库名称1.password=root
  40. spring.datasource.dynamic.datasource.英文数据库名称1.driver-class-name=com.mysql.cj.jdbc.Driver
  41. #<!--=====================数据库2 ====================-->
  42. spring.datasource.dynamic.英文数据库名称2.url=jdbc:mysql://ip地址:3306/英文数据库名称?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false
  43. spring.datasource.dynamic.datasource.英文数据库名称2.username=root
  44. spring.datasource.dynamic.datasource.英文数据库名称2.password=root
  45. spring.datasource.dynamic.datasource.英文数据库名称2.driver-class-name=com.mysql.cj.jdbc.Driver

4.Rabbitmq 基础配置类 TtlQueueConfig

  1. rabbitmq基础信息配置已经在application-dev.properities中进行配置过一部分
  1. importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassTtlQueueConfig{publicstaticfinalString X_EXCHANGE ="X";publicstaticfinalString QUEUE_A ="QA";publicstaticfinalString QUEUE_B ="QB";publicstaticfinalString Y_DEAD_LETTER_EXCHANGE ="Y";publicstaticfinalString DEAD_LETTER_QUEUE ="QD";@Bean("xExchange")publicDirectExchangexExchange(){returnnewDirectExchange(X_EXCHANGE);}@Bean("yExchange")publicDirectExchangeyExchange(){returnnewDirectExchange(Y_DEAD_LETTER_EXCHANGE);}@Bean("queueA")publicQueuequeueA(){Map<String,Object> args =newHashMap<>(3);//声明当前队列绑定的死信交换机
  2. args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 key
  3. args.put("x-dead-letter-routing-key","YD");//声明队列的 TTL
  4. args.put("x-message-ttl",150000);//超出150秒没有被消费 就会进入死信队列returnQueueBuilder.durable(QUEUE_A).withArguments(args).build();}@BeanpublicBindingqueueaBindingX(@Qualifier("queueA")org.springframework.amqp.core.Queue queueA,@Qualifier("xExchange")DirectExchange xExchange){returnBindingBuilder.bind(queueA).to(xExchange).with("XA");}//声明队列 B ttl 为 40s 并绑定到对应的死信交换机@Bean("queueB")publicQueuequeueB(){Map<String,Object> args =newHashMap<>(3);//声明当前队列绑定的死信交换机
  5. args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 key
  6. args.put("x-dead-letter-routing-key","YD");//声明队列的 TTL
  7. args.put("x-message-ttl",40000);//超出40秒没有被消费 就会进入死信队列returnQueueBuilder.durable(QUEUE_B).withArguments(args).build();}//声明队列 B 绑定 X 交换机@BeanpublicBindingqueuebBindingX(@Qualifier("queueB")Queue queue1B,@Qualifier("xExchange")DirectExchange xExchange){returnBindingBuilder.bind(queue1B).to(xExchange).with("XB");}//声明死信队列 QD@Bean("queueD")publicQueuequeueD(){returnnewQueue(DEAD_LETTER_QUEUE);}//声明死信队列 QD 绑定关系@BeanpublicBindingdeadLetterBindingQAD(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange yExchange){returnBindingBuilder.bind(queueD).to(yExchange).with("YD");}@BeanpublicSimpleRabbitListenerContainerFactorysimpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =newSimpleRabbitListenerContainerFactory();//这个connectionFactory就是我们自己配置的连接工厂直接注入进来
  8. simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);//这边设置消息确认方式由自动确认变为手动确认
  9. simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置消息预取数量// simpleRabbitListenerContainerFactory.setPrefetchCount(1);return simpleRabbitListenerContainerFactory;}}

5.建立netty服务器 + rabbitmq消息生产者

创建服务器类 初始化启动服务器NettyServer

  1. importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.handler.codec.string.StringDecoder;importio.netty.handler.codec.string.StringEncoder;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassNettyServerimplementsApplicationRunner{@AutowiredprivateRabbitTemplate rabbitTemplate;//有可能在项目初始化的时候加载不出来导致项目隐形报错//加载application-dev.proerities文件中 对应参数配置项@Value("${netty.server.port}")privateint port;publicint getPort (){return port;}@Overridepublicvoidrun(ApplicationArguments args)throwsException{
  2. log.info("netty服务启动端口"+getPort());EventLoopGroup bossGroup =newNioEventLoopGroup();// 用于接收进来的连接EventLoopGroup workerGroup =newNioEventLoopGroup();// 用于处理已经被接收的连接ServerBootstrap b =newServerBootstrap();
  3. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)// 使用Nio的通道类型.childHandler(newChannelInitializer<SocketChannel>(){// 添加一个处理器来处理接收到的数据@OverridepublicvoidinitChannel(SocketChannel ch)throwsException{ChannelPipeline p = ch.pipeline();
  4. p.addLast(newStringDecoder());
  5. p.addLast(newStringEncoder());
  6. p.addLast(newSimpleChannelInboundHandler<String>(){@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,String msg)throwsException{//netty监听端口消息发送消息通过rabbitmq生产者发送到消息队列sendMessage(ctx,msg);}@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause){
  7. cause.printStackTrace();
  8. ctx.close();}});}});// 绑定端口,开始接收进来的连接ChannelFuture f = b.bind(getPort()).sync();// 等待服务器socket关闭
  9. f.channel().closeFuture().sync();}//netty监听端口消息发送消息通过rabbitmq生产者发送到消息队列中//充当消息的生产者发送publicvoidsendMessage(ChannelHandlerContext ctx,String msg){// 接收netty监听信息来源作为消息生产者
  10. rabbitTemplate.convertAndSend("X","XA","来自QA"+msg);
  11. ctx.writeAndFlush("===>"+msg);}}

6.建立常规队列的消费者 Consumer

  1. importcom.rabbitmq.client.Channel;importcom.test.demo.service.MessageProcessService;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.io.IOException;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;@Slf4j@ComponentpublicclassConsumer{@ResourceprivateMessageProcessService messProceService;privateMap<Integer,Message> messageMap =newHashMap<>();privateint i =1;privateint j =1;@RabbitListener(queues ="QA")publicvoidreceiveQA(Message message,Channel channel)throwsInterruptedException,IOException{
  2. i++;synchronized(this){
  3. messageMap.put(i,message);if(messageMap.size()>=1500){// 模拟数据库插入操作System.out.println("模拟插入数据库操作,QA队列处理消息数:"+ messageMap.size());processMessagesBatch(channel);System.out.println("模拟插入成功, 准备进行下一次收集");}}}// @RabbitListener(queues = "QB")// public void receiveQB(Message message, Channel channel) throws IOException {// String msg = new String(message.getBody());// log.info("当前时间:{},监听QB队列信息{}", new Date().toString(), msg);// }publicstaticclassSleepUtils{publicstaticvoidsleep(int second){try{Thread.sleep(1000* second);}catch(InterruptedException _ignored){Thread.currentThread().interrupt();}}}privatevoidprocessMessagesBatch(Channel channel)throwsIOException{List<String> list =newArrayList<>();long startTime =System.currentTimeMillis();Map<Integer,Message> tempMessageMap =newHashMap<>(messageMap);
  4. messageMap.clear();for(Map.Entry<Integer,Message> map : tempMessageMap.entrySet()){
  5. list.add("C"+(j++)+":===>"+"QA队列收集"+newString(map.getValue().getBody()));}int count = messProceService.add(list);if(count == list.size()){System.out.println("插入数据成功");for(Map.Entry<Integer,Message> map : tempMessageMap.entrySet()){
  6. channel.basicAck(map.getValue().getMessageProperties().getDeliveryTag(),false);}
  7. list.clear();}long durationSeconds =(System.currentTimeMillis()- startTime)/1000;System.out.println("插入1500条数据执行时间: "+ durationSeconds);}}

7.建立死信队列的消费者 DeadLetterConsumer

  1. importcom.rabbitmq.client.Channel;importcom.test.demo.service.MessageProcessService;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.io.IOException;importjava.util.*;@Slf4j@ComponentpublicclassDeadLetterConsumer{@ResourceprivateMessageProcessService messProceService;privateMap<Integer,Message> messageMap =newHashMap<>();privateint i =1;privateint j =1;@RabbitListener(queues ="QD")publicvoidreceiveD(Message message,Channel channel)throwsIOException{
  2. i++;synchronized(this){
  3. messageMap.put(i, message);if(messageMap.size()>=100){// 模拟数据库插入操作System.out.println("模拟插入数据库操作,死信队列处理消息数:"+ messageMap.size());processMessagesBatch(channel);System.out.println("模拟插入成功, 准备进行下一次收集");// 确认当前消息,以便RabbitMQ知道它可以释放此消息}}}privatevoidprocessMessagesBatch(Channel channel)throwsIOException{List<String> list =newArrayList<>();// 复制当前消息映射以避免在迭代时修改Map<Integer,Message> tempMessageMap =newHashMap<>(messageMap);
  4. messageMap.clear();long startTime =System.currentTimeMillis();for(Map.Entry<Integer,Message> map : tempMessageMap.entrySet()){
  5. list.add("C"+(j++)+":===>"+"死信队列收集"+newString(map.getValue().getBody()));}int count = messProceService.add(list);long durationSeconds =(System.currentTimeMillis()- startTime)/1000;System.out.println("插入50条数据执行时间: "+ durationSeconds);if(count == list.size()){System.out.println("插入数据成功");
  6. list.clear();for(Map.Entry<Integer,Message> map : tempMessageMap.entrySet()){
  7. channel.basicAck(map.getValue().getMessageProperties().getDeliveryTag(),false);}}}}

8.建立mapper.xml文件

MessageProcessMapper.xml

  1. <?xml version="1.0" encoding="UTF-8"?><!DOCTYPEmapperPUBLIC"-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mappernamespace="com.test.demo.mapper.MessageProcessMapper"><insertid="add">
  2. INSERT INTO t_xinyang_direct (direct,create_date)
  3. VALUES
  4. <foreachcollection="list"item="item"separator=",">
  5. (
  6. #{item},
  7. SYSDATE()
  8. )
  9. </foreach></insert></mapper>

9.建立mapper文件接口

MessageProcessMapper

  1. importcom.baomidou.dynamic.datasource.annotation.DS;importorg.apache.ibatis.annotations.Mapper;importjava.util.List;@Mapper@DS("英文数据库名称1")publicinterfaceMessageProcessMapper{intadd(List<String> list);}

10.建立接口ProducerController 测试

  1. importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.http.ResponseEntity;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@Slf4j@RestController@RequestMapping("/api")publicclassProducerController{@GetMapping("/hello/{message}")publicResponseEntity<String>sayHello(@PathVariableString message){returnResponseEntity.ok("Hello, RabbitMQ!==>"+message);}@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")publicvoidsendMsg(@PathVariableString message){
  2. log.info("接收接口发送的消息请求");
  3. rabbitTemplate.convertAndSend("X","XA","消息来自tt1为10s的队列"+message);
  4. rabbitTemplate.convertAndSend("X","XB","消息来自tt1为150s的队列"+message);}}

11.测试接口请求1

测试请求接口 /hello
http://实际本机ip地址:1001/demo1/api/hello/返回浏览器输入内容
在这里插入图片描述

12.测试接口请求2

测试请求接口 /hello
http://实际本机ip地址:1001/demo1/api/sendRabbitMq/发给rabbitmq消息
在这里插入图片描述

13.网络助手测试NetAssist.exe

主要目的模拟向netty端口进行发送数据,通过netty监听到的信息然后通过rabbitmq的生产者发送rabbitmq的队列中,让消费者进行消费,如果消费者绑定死信队列,那么消费者从队列中取出消息后,经过一定时间未确认即不进行消费确认或者拒绝,然后入之前绑定好的死信队列中,供死信队列绑定的死信消费者进行消费处理。

14.观察rabbitmq界面管理简单介绍

在这里插入图片描述


本文转载自: https://blog.csdn.net/beiback/article/details/142577968
版权归原作者 退无可退而立版 所有, 如有侵权,请联系我们删除。

“Springboot + netty + rabbitmq + myBatis”的评论:

还没有评论