0


使用rabbitmq发送消息和caffeineCache保存本地

当我们在查询最新的股票大盘数据时,我们会频繁的向mysql查询数据,会给mysql造成很大的压力,所以我们可以使用caffeineCache本地缓存。

大致思路:
我们先使用stock_job工程采集到国内大盘的最新交易时间的信息时并将数据插入数据库,使用rabbitmq发送消息(消息为当前时间)

在stock_backend工程定义消息队列监听类,如果接收到的时间和发送消息的时间相差一分钟时,就报错,不超过一分钟,就清除之前的本地缓存,再通过mapper向数据库查询数据,然后再重新把最新的国内大盘数据存入本地缓存中。

这样一来,如果stock_job工程没有向数据库中插入最新交易时间的国内大盘数据信息,在stock_backend中查询最新交易时间的国内大盘数据信息时,就会直接从本地缓存中获取数据信息。

使用stock_job工程采集到国内大盘的最新交易时间的信息并插入数据库,使用rabbitmq发送消息

1.导入依赖

  1. <!-- 导入mq依赖-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2.编写yml文件,配置连接rabbitmq的信息

  1. spring:
  2. rabbitmq:
  3. host: 192.168.230.100 # rabbitMQ的ip地址
  4. port: 5672 # 端口
  5. username: hhh
  6. password: 1234
  7. virtual-host: /

3.编写mq的配置类,生成交换机,消息队列,并将他们绑定

  1. @Configuration
  2. public class MqConfig {
  3. /**
  4. * 重新定义消息序列化的方式,改为基于json格式序列化和反序列化
  5. */
  6. @Bean
  7. public MessageConverter messageConverter() {
  8. return new Jackson2JsonMessageConverter();
  9. }
  10. /**
  11. * 国内大盘信息队列
  12. */
  13. @Bean
  14. public Queue innerMarketQueue() {
  15. return new Queue("innerMarketQueue", true);
  16. }
  17. /**
  18. * 定义路由股票信息的交换机
  19. */
  20. @Bean
  21. public TopicExchange innerMarketTopicExchange() {
  22. return new TopicExchange("stockExchange", true, false);
  23. }
  24. /**
  25. * 绑定队列到指定交换机
  26. */
  27. @Bean
  28. public Binding bindingInnerMarketExchange() {
  29. return BindingBuilder.bind(innerMarketQueue()).to(innerMarketTopicExchange())
  30. .with("inner.market");//设置routingKey
  31. }
  32. }

4.采集最新到最新的国内大盘数据信息,并插入到数据时时,发送消息

  1. @Override
  2. public void getInnerMarketInfo() {
  3. //......
  4. //解析的数据批量插入数据库
  5. int count= stockMarketIndexInfoMapper.insertBatch(entities);
  6. log.info("当前插入了:{}行数据",count);
  7. //通知后台终端刷新本地缓存,发送的日期数据是告知对方当前更新的股票数据所在时间点
  8. rabbitTemplate.convertAndSend("stockExchange","inner.market",new Date());
  9. }

5.查看消息队列是否存在消息

在stock_backend工程中定义消息监听类,并配置本地缓存

1.导入mq依赖和caffeine

  1. <dependencies>
  2. <!-- 导入mq依赖-->
  3. <dependency>
  4. <groupId>org.springframework.boot</groupId>
  5. <artifactId>spring-boot-starter-amqp</artifactId>
  6. </dependency>
  7. <!-- 本地缓存依赖-->
  8. <dependency>
  9. <groupId>com.github.ben-manes.caffeine</groupId>
  10. <artifactId>caffeine</artifactId>
  11. </dependency>

2.编写yml文件

  1. spring:
  2. rabbitmq:
  3. host: 192.168.230.130 # rabbitMQ的ip地址
  4. port: 5672 # 端口
  5. username: hhh
  6. password: 1234
  7. virtual-host: /

3.编写caffeine的配置类,和mq的配置类

  1. /**
  2. * 构建缓存bean
  3. * @return
  4. */
  5. @Bean
  6. public Cache<String,Object> caffeineCache(){
  7. Cache<String, Object> cache = Caffeine
  8. .newBuilder()
  9. .maximumSize(200)//设置缓存数量上限
  10. // .expireAfterAccess(1, TimeUnit.SECONDS)//访问1秒后删除
  11. // .expireAfterWrite(1,TimeUnit.SECONDS)//写入1秒后删除
  12. .initialCapacity(100)// 初始的缓存空间大小
  13. .recordStats()//开启统计
  14. .build();
  15. return cache;
  16. }

**这里不用定义交换机和消息队列 **

  1. @Configuration
  2. public class MqConfig {
  3. /**
  4. * 重新定义消息序列化的方式,改为基于json格式序列化和反序列化
  5. */
  6. @Bean
  7. public MessageConverter messageConverter() {
  8. return new Jackson2JsonMessageConverter();
  9. }
  10. }

4.编写消息监听类

  1. @Component
  2. @Slf4j
  3. public class MqListener {
  4. @Autowired
  5. private Cache<String,Object> caffeineCache;
  6. @Autowired
  7. private StockService stockService;
  8. @RabbitListener(queues = "innerMarketQueue")
  9. public void acceptInnerMarketInfo(Date date){//消息队列里的数据类型是Date,所以接收的参数类型也是Date
  10. long differTime = DateTime.now().getMillis() - new DateTime(date).getMillis();
  11. if(differTime>60000L){
  12. log.error("采集国内大盘时间点:{},同步超时:{}ms",new DateTime(date).toString("yyyy-MM-dd HH:mm:ss"),differTime);
  13. }
  14. //发送信息和接收信息在一分钟以内
  15. //删除key为innerMarketInfosKey的缓存
  16. caffeineCache.invalidate("innerMarketInfosKey");
  17. //重新获取数据
  18. //调用服务更新缓存
  19. stockService.getInnerMarket();
  20. }
  21. }

5. stockService.getInnerMarket()方法

  1. /**
  2. * 获取国内大盘最新的数据
  3. * @return
  4. */
  5. @Override
  6. public R<List<InnerMarketDomain>> getInnerMarket() {
  7. //获取key为innerMarketInfosKey的本地缓存数据,如果不存在,就去数据库中查询数据,并存入本地缓存中
  8. //本地缓存默认一分钟消失
  9. R<List<InnerMarketDomain>> result= (R<List<InnerMarketDomain>>) caffeineCache.get("innerMarketInfosKey", key->{
  10. //1.获取当前时间的最新交易点(精确到分钟,秒和毫秒置为0)
  11. Date curDate = DateTimeUtil.getLastDate4Stock(DateTime.now()).toDate();
  12. //Date curDate = MyDateTimeUtil.getLateDate4Stock(DateTime.now()).toDate();
  13. //mock data 等后续股票采集job工程完成,再将此代码删除
  14. curDate=DateTime.parse("2021-12-28 09:31:00", DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toDate();
  15. //log.info("curDate:{}",curDate);
  16. //2.获取国内大盘的编码集合
  17. List<String> mcodes = stockInfoConfig.getInner();
  18. //3.调用mapper进行查询
  19. List<InnerMarketDomain> data=stockMarketIndexInfoMapper.getInnerMarket(curDate,mcodes);
  20. //4.返回数据
  21. return R.ok(data);
  22. });
  23. return result;
  24. }

6.debug启动SpringBoot引导类

自动跳到消息监听类

清空缓存,然后调用mapper方法重新向数据库查询最新的国内大盘数据,再重新保存到本地缓存中

查看caffeineCache成员变量的值

成功缓存key为innerMarketInfosKey

的数据


本文转载自: https://blog.csdn.net/luosuss/article/details/142005256
版权归原作者 落落落sss 所有, 如有侵权,请联系我们删除。

“使用rabbitmq发送消息和caffeineCache保存本地”的评论:

还没有评论