当我们在查询最新的股票大盘数据时,我们会频繁的向mysql查询数据,会给mysql造成很大的压力,所以我们可以使用caffeineCache本地缓存。
大致思路:
我们先使用stock_job工程采集到国内大盘的最新交易时间的信息时并将数据插入数据库,使用rabbitmq发送消息(消息为当前时间)
在stock_backend工程定义消息队列监听类,如果接收到的时间和发送消息的时间相差一分钟时,就报错,不超过一分钟,就清除之前的本地缓存,再通过mapper向数据库查询数据,然后再重新把最新的国内大盘数据存入本地缓存中。
这样一来,如果stock_job工程没有向数据库中插入最新交易时间的国内大盘数据信息,在stock_backend中查询最新交易时间的国内大盘数据信息时,就会直接从本地缓存中获取数据信息。
使用stock_job工程采集到国内大盘的最新交易时间的信息并插入数据库,使用rabbitmq发送消息
1.导入依赖
<!-- 导入mq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.编写yml文件,配置连接rabbitmq的信息
spring:
rabbitmq:
host: 192.168.230.100 # rabbitMQ的ip地址
port: 5672 # 端口
username: hhh
password: 1234
virtual-host: /
3.编写mq的配置类,生成交换机,消息队列,并将他们绑定
@Configuration
public class MqConfig {
/**
* 重新定义消息序列化的方式,改为基于json格式序列化和反序列化
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 国内大盘信息队列
*/
@Bean
public Queue innerMarketQueue() {
return new Queue("innerMarketQueue", true);
}
/**
* 定义路由股票信息的交换机
*/
@Bean
public TopicExchange innerMarketTopicExchange() {
return new TopicExchange("stockExchange", true, false);
}
/**
* 绑定队列到指定交换机
*/
@Bean
public Binding bindingInnerMarketExchange() {
return BindingBuilder.bind(innerMarketQueue()).to(innerMarketTopicExchange())
.with("inner.market");//设置routingKey
}
}
4.采集最新到最新的国内大盘数据信息,并插入到数据时时,发送消息
@Override
public void getInnerMarketInfo() {
//......
//解析的数据批量插入数据库
int count= stockMarketIndexInfoMapper.insertBatch(entities);
log.info("当前插入了:{}行数据",count);
//通知后台终端刷新本地缓存,发送的日期数据是告知对方当前更新的股票数据所在时间点
rabbitTemplate.convertAndSend("stockExchange","inner.market",new Date());
}
5.查看消息队列是否存在消息
在stock_backend工程中定义消息监听类,并配置本地缓存
1.导入mq依赖和caffeine
<dependencies>
<!-- 导入mq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 本地缓存依赖-->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
2.编写yml文件
spring:
rabbitmq:
host: 192.168.230.130 # rabbitMQ的ip地址
port: 5672 # 端口
username: hhh
password: 1234
virtual-host: /
3.编写caffeine的配置类,和mq的配置类
/**
* 构建缓存bean
* @return
*/
@Bean
public Cache<String,Object> caffeineCache(){
Cache<String, Object> cache = Caffeine
.newBuilder()
.maximumSize(200)//设置缓存数量上限
// .expireAfterAccess(1, TimeUnit.SECONDS)//访问1秒后删除
// .expireAfterWrite(1,TimeUnit.SECONDS)//写入1秒后删除
.initialCapacity(100)// 初始的缓存空间大小
.recordStats()//开启统计
.build();
return cache;
}
**这里不用定义交换机和消息队列 **
@Configuration
public class MqConfig {
/**
* 重新定义消息序列化的方式,改为基于json格式序列化和反序列化
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
4.编写消息监听类
@Component
@Slf4j
public class MqListener {
@Autowired
private Cache<String,Object> caffeineCache;
@Autowired
private StockService stockService;
@RabbitListener(queues = "innerMarketQueue")
public void acceptInnerMarketInfo(Date date){//消息队列里的数据类型是Date,所以接收的参数类型也是Date
long differTime = DateTime.now().getMillis() - new DateTime(date).getMillis();
if(differTime>60000L){
log.error("采集国内大盘时间点:{},同步超时:{}ms",new DateTime(date).toString("yyyy-MM-dd HH:mm:ss"),differTime);
}
//发送信息和接收信息在一分钟以内
//删除key为innerMarketInfosKey的缓存
caffeineCache.invalidate("innerMarketInfosKey");
//重新获取数据
//调用服务更新缓存
stockService.getInnerMarket();
}
}
5. stockService.getInnerMarket()方法
/**
* 获取国内大盘最新的数据
* @return
*/
@Override
public R<List<InnerMarketDomain>> getInnerMarket() {
//获取key为innerMarketInfosKey的本地缓存数据,如果不存在,就去数据库中查询数据,并存入本地缓存中
//本地缓存默认一分钟消失
R<List<InnerMarketDomain>> result= (R<List<InnerMarketDomain>>) caffeineCache.get("innerMarketInfosKey", key->{
//1.获取当前时间的最新交易点(精确到分钟,秒和毫秒置为0)
Date curDate = DateTimeUtil.getLastDate4Stock(DateTime.now()).toDate();
//Date curDate = MyDateTimeUtil.getLateDate4Stock(DateTime.now()).toDate();
//mock data 等后续股票采集job工程完成,再将此代码删除
curDate=DateTime.parse("2021-12-28 09:31:00", DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toDate();
//log.info("curDate:{}",curDate);
//2.获取国内大盘的编码集合
List<String> mcodes = stockInfoConfig.getInner();
//3.调用mapper进行查询
List<InnerMarketDomain> data=stockMarketIndexInfoMapper.getInnerMarket(curDate,mcodes);
//4.返回数据
return R.ok(data);
});
return result;
}
6.debug启动SpringBoot引导类
自动跳到消息监听类
清空缓存,然后调用mapper方法重新向数据库查询最新的国内大盘数据,再重新保存到本地缓存中
查看caffeineCache成员变量的值
成功缓存key为innerMarketInfosKey
的数据
版权归原作者 落落落sss 所有, 如有侵权,请联系我们删除。