前言
多级缓存在微服务的架构设计中可谓随处可见,多级缓存作为提升系统高并发的常规手段,在各类大中小型的系统设计中都有体现;
下图是一张简单的服务端多级缓存设计示意图,多级缓存的常用解决方案,像ehcache + redis,或caffeine + springcache等,即利用JVM内存缓存 + redis缓存配合;
一、缓存一致性问题
多级缓存带来的好处是显著的,一定程度上可以应对较高的并发,但随之带来了一个比较大的问题就是缓存一致性问题;
我们知道,JVM缓存属于进程级的缓存,和当前服务实例是绑定的,而redis缓存可以作为分布式缓存,通常JVM缓存的是那些生命周期较短的热点查询数据,即过期时间不会太久,而redis缓存相对来说,过期时间相对长一点,JVM缓存通常作为服务端扛压的第一道屏障,如果设置的过期时间太长,将会对JVM内存的开销非常大,所以一般作为短频使用;
设想这么一个场景,服务A采用多实例部署,这里假设部署了两个节点,首次根据ID查询一个用户信息的对象数据将会同时被JVM缓存,同时也会被redis缓存,下一次过来同样参数的请求时,首先走JVM缓存,查到了直接返回,否则走redis缓存;
上面是一个正常的关于缓存存取的过程,问题是,JVM缓存是同进程绑定的,如果第一个节点的数据发生了变更,比如删除了,对于redis缓存来说,可以做到动态刷缓存的效果,但是redis缓存和本地缓存之间并没有一种强同步的机制确保两者的缓存保持一致;
甚至来说,第一个节点与第二个节点之间,两者是无状态的,当第一个节点上面的数据被删除时,假如此刻并发的查询请求到达第二个节点,JVM缓存查询到必然是上一次缓存的数据;
于是,我们的问题就是,在多级缓存模式下,如何解决缓存一致性的问题呢?
二、一个简单的案例
基于之前的一篇 springcache 详细使用 和 spring boot 二级缓存案例基础上我们进行案例演示和改造;
在案例中,我们提供了几个核心的接口:
- 根据用户ID查询用户,并缓存到redis;
- 根据用户ID查询用户,并缓存到JVM,这里采用caffeine;
- 根据用户ID删除用户;
1、缓存核心配置类
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
template.setValueSerializer(jackson2JsonRedisSerializer);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
/**
* 分钟级别
* @param connectionFactory
* @return
*/
@Bean("cacheManagerMinutes")
public RedisCacheManager cacheManagerMinutes(RedisConnectionFactory connectionFactory){
RedisCacheConfiguration configuration = instanceConfig(3 * 60L);
return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(configuration)
.transactionAware()
.build();
}
/**
* 小时级别
* @param connectionFactory
* @return
*/
@Bean("cacheManagerHour")
@Primary
public RedisCacheManager cacheManagerHour(RedisConnectionFactory connectionFactory){
RedisCacheConfiguration configuration = instanceConfig(3600L);
return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(configuration)
.transactionAware()
.build();
}
/**
* 天级别
* @param connectionFactory
* @return
*/
@Bean("cacheManagerDay")
public RedisCacheManager cacheManagerDay(RedisConnectionFactory connectionFactory){
RedisCacheConfiguration configuration = instanceConfig(3600 * 24L);;
return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(configuration)
.transactionAware()
.build();
}
/**
* 正常时间的本地缓存
*/
@Bean("caffeineCacheManager")
public CacheManager caffeineCacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(Caffeine.newBuilder()
.expireAfterWrite(50, TimeUnit.SECONDS)
.initialCapacity(256)
.maximumSize(10000));
return cacheManager;
}
private RedisCacheConfiguration instanceConfig(long ttl){
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
objectMapper.configure(MapperFeature.USE_ANNOTATIONS,false);
//只针对非空的值进行序列化
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
//将类型序列化到属性的json字符串
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL,
JsonTypeInfo.As.PROPERTY);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
return RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(ttl))
.disableCachingNullValues()
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer));
}
/**
* 自定义key生成策略
* @return
*/
@Bean("defaultSpringKeyGenerator")
public KeyGenerator defaultSpringKeyGenerator(){
return new KeyGenerator() {
@Override
public Object generate(Object o, Method method, Object... objects) {
String key = o.getClass().getSimpleName() + "_"
+ method.getName() +"_"
+ StringUtils.arrayToDelimitedString(objects,"_");
System.out.println("key :" + key);
return key;
}
};
}
}
2、配置文件开启使用 springcache
spring:
redis:
host: localhost
port: 6379
cache:
type: redis
3、几个核心接口
1)根据用户ID获取用户
@GetMapping("/getById")
public DbUser getById(String id){
return dbUserService.getById(id);
}
@Override
@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")
public DbUser getById(String id) {
System.out.println("首次查询走数据库");
DbUser dbUser = dbUserMapper.getByUserId(id);
return dbUser;
}
2)根据用户ID查询用户,并缓存到JVM;
@GetMapping("/getByIdFromCaffeine")
public DbUser getByIdFromCaffeine(String id){
return dbUserService.getByIdFromCaffeine(id);
}
@Override
@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "caffeineCacheManager")
public DbUser getByIdFromCaffeine(String id) {
System.out.println("查询数据库");
DbUser dbUser = dbUserMapper.getByUserId(id);
System.out.println("第一次走缓存");
return dbUser;
}
3)根据用户ID删除用户;
@GetMapping("/deleteById")
public String deleteById(String id){
return dbUserService.deleteById(id);
}
@Override
@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")
public String deleteById(String id) {
dbUserMapper.deleteByUserId(id);
return "delete success";
}
4、功能测试
首先在数据库的 db_user 表准备一条测试数据
分别调用查询用户接口
多次刷新接口,sql只输出了一次
多次刷新接口,sql只输出了一次
从上面的结果可以看到,我们模拟了查询数据分别缓存到了JVM内存和redis的效果,接下来,删除当前这条数据,执行下面的接口
再次调用第一个查询用户的接口,无返回数据,表明redis中缓存的结果被清理了,这是我们使用了springcache后,通过 CacheEvict 这个注解,会自动帮我们管理redis中的缓存;
但这时,再次调用查询JVM缓存的接口,发现仍然可以从本地缓存中得到数据
基于上面的测试结果,可以看到,缓存一致性的问题就产生了,这里我故意将本地缓存的时间调整的长了一点,实际开发过程中,建议本地缓存的时间一般不要超过1分钟;
三、解决方案一:清理redis缓存,同步清理本地缓存
1、增加一个本地缓存操作的工具类
import com.congge.config.SpringContextHolder;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import java.util.Objects;
public class CaffeineCacheUtils {
private static CacheManager cm;
static {
cm = SpringContextHolder.getBean("caffeineCacheManager");
}
/**
* 添加缓存
*
* @param cacheName 缓存名称
* @param key 缓存key
* @param value 缓存值
*/
public static void put(String cacheName, String key, Object value) {
Cache cache = cm.getCache(cacheName);
cache.put(key, value);
}
/**
* 获取缓存
*
* @param cacheName 缓存名称
* @param key 缓存key
* @return
*/
public static Object get(String cacheName, String key) {
Cache cache = cm.getCache(cacheName);
if (cache == null) {
return null;
}
return Objects.requireNonNull(cache.get(key)).get();
}
/**
* 获取缓存(字符串)
*
* @param cacheName 缓存名称
* @param key 缓存key
* @return
*/
public static String getString(String cacheName, String key) {
Cache cache = cm.getCache(cacheName);
if (cache == null) {
return null;
}
Cache.ValueWrapper wrapper = cache.get(key);
if (wrapper == null) {
return null;
}
return Objects.requireNonNull(wrapper.get()).toString();
}
/**
* 获取缓存(泛型)
*
* @param cacheName 缓存名称
* @param key 缓存key
* @param clazz 缓存类
* @param <T> 返回值泛型
* @return
*/
public static <T> T get(String cacheName, String key, Class<T> clazz) {
Cache cache = cm.getCache(cacheName);
if (cache == null) {
return null;
}
Cache.ValueWrapper wrapper = cache.get(key);
if (wrapper == null) {
return null;
}
return (T) wrapper.get();
}
/**
* 清理缓存
*
* @param cacheName 缓存名称
* @param key 缓存key
*/
public static void evict(String cacheName, String key) {
Cache cache = cm.getCache(cacheName);
System.out.println(cache.getName());
if (cache != null) {
cache.evict(key);
}
}
}
2、删除用户接口中同步清理本地缓存
只需改造下删除接口的服务实现方法即可
private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();
/**
* 删除,同时需要删除相关的key
* @param id
* @return
*/
@Override
@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")
public String deleteById(String id) {
dbUserMapper.deleteByUserId(id);
caffeineCacheUtils.evict("dbUser",id);
return "delete success";
}
3、方案优缺点
优点
- 操作简便;
- 只要参数传入正确,就可以确保缓存一致性;
- 适合单机模式下使用
缺点
- 代码产生了一定的耦合性;
- 不适合分布式环境使用;
- 需要手动管理key的相关参数;
四、解决方案二:使用zookeeper 实现缓存同步
对zookeeper有所了解和使用的同学,应该对zk的节点管理不陌生,zk作为一款分布式协调中间件,在很多分布式场景都有着广泛的使用,比如实现集群选举,分布式锁,节点管理等等,利用zk的节点属性,可以很好的解决这个问题;
使用zk的解决思路
- 查询用户接口中,注册一个节点,节点命名最好和缓存的key保持一致;
- 删除接口中,手动触zk的节点删除;
- zk监听到删除节点的事件变化时,同步清理本地缓存;
1、zk客户端依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
2、提供一个zk节点操作工具类
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
public class ZkUtils {
private ZkClient zkClient = null;
private String node;
/*public static void main(String[] args) {
ZkUtils zkUtils = new ZkUtils();
zkUtils.createNode(node);
zkUtils.nodeExist(node);
zkUtils.deleteNode(node);
}*/
public ZkUtils(String node) {
zkClient = new ZkClient("localhost:2181", 60000 * 30, 60000, new SerializableSerializer());
//监听节点变化
//需要通过java修改zookeeper数据,才能监听到
zkClient.subscribeDataChanges("/" + node, new IZkDataListener() {
//节点数据变化时触发
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("change Node: " + s);
System.out.println("change data: " + o);
}
//节点数据删除时触发
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("delete Node: " + s);
}
});
}
/**
* 创建zk节点
*
* @param node
*/
public void createNode(String node) {
//创建持久节点
String node1 = zkClient.create("/" + node, node, CreateMode.PERSISTENT);
System.out.println(node1);
}
/**
* 修改zk节点数据
*
* @param node
* @param data
*/
public void writeNodeData(String node, String data) {
zkClient.writeData("/" + node, 233);
}
/**
* 查询zk节点
*
* @param node
*/
public boolean nodeExist(String node) {
boolean exists = zkClient.exists("/" + node);
return exists;
}
/**
* 查询节点数据
*
* @param node
* @return
*/
public String findNodeData(String node) {
Object data = zkClient.readData("/" + node);
System.out.println(data);
return data.toString();
}
/**
* 删除节点
*
* @param node
*/
public void deleteNode(String node) {
boolean b2 = zkClient.deleteRecursive("/" + node);
System.out.println(b2);
}
}
3、查询用户接口,注册缓存的key对应的z-node节点
@Override
@Cacheable(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")
public DbUser getById(String id) {
System.out.println("首次查询走数据库");
DbUser dbUser = dbUserMapper.getByUserId(id);
//FIXME 将缓存注册到节点
registerCacheNode(id);
return dbUser;
}
public void registerCacheNode(String id){
String node = "user:" + id;
ZkUtils zkUtils = new ZkUtils(node);
zkUtils.createNode(node);
}
4、删除用户接口添加删除zk节点逻辑
@Override
@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")
public String deleteById(String id) {
dbUserMapper.deleteByUserId(id);
//删除 z-node 节点
String node = "user:" + id;
ZkUtils zkUtils = new ZkUtils(node);
zkUtils.deleteNode(node);
return "delete success";
}
5、改造zk监听逻辑,同步移除本地缓存
private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();
public ZkUtils(String node) {
zkClient = new ZkClient("localhost:2181", 60000 * 30, 60000, new SerializableSerializer());
//监听节点变化
//需要通过java修改zookeeper数据,才能监听到
zkClient.subscribeDataChanges("/" + node, new IZkDataListener() {
//节点数据变化时触发
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("change Node: " + s);
System.out.println("change data: " + o);
}
//节点数据删除时触发
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("delete Node: " + s);
caffeineCacheUtils.evict("dbUser","1");
}
});
}
6、测试
1、启动服务后,按照上面的测试步骤,分别调用2个查询接口
通过控制台输出结果,可以看到节点数据注册到zk中
2、调用删除接口
此时zk的监听逻辑中监听到了节点数据变更的事件,在变更的逻辑中,我们将同步删除本地缓存的数据;
再次调用时发现缓存已经被清理
通过上面的操作演示,实现了基于zk的节点注册与事件监听机制实现缓存一致性的问题处理;
五、解决方案三:使用redis事件订阅与发布机制实现缓存同步
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
这种模式很像消息队列的实现机制,服务端发布消息到topic,客户端监听topic的消息,并做自身的业务处理;
只不过在redis这里,不叫topic,而是channel,下面来看一个简单的redis实现的发布订阅使用
1、导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、自定义 RedisMessageListener
该类的功能和消息中间件中的监听逻辑很相似
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取消息
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
// 获取监听的频道
byte[] channelByte = message.getChannel();
// 使用字符串序列化器转换
Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
// 渠道名称转换
String patternStr = new String(pattern);
System.out.println(patternStr);
System.out.println("---频道---: " + channel);
System.out.println("---消息内容---: " + msg);
}
}
3、自定义 RedisSubConfig
该类用于配置特定的channel,即监听来自哪些channel的消息
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisSubConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
//订阅频道redis.news 和 redis.life 这个container 可以添加多个 messageListener
container.addMessageListener(listener, new ChannelTopic("redis.life"));
container.addMessageListener(listener, new ChannelTopic("redis.news"));
return container;
}
}
4、最后编写一个接口做测试
@GetMapping("/testPublish")
public void testPublish(){
dbUserService.testPublish();
}
@Autowired
private RedisTemplate redisTemplate;
@Override
public void testPublish() {
redisTemplate.convertAndSend("redis.life", "aaa");
redisTemplate.convertAndSend("redis.news", "bbb");
}
调用下接口,可以看到控制台输出如下信息
通过上面的演示,快速了解了一下redis的这种发布订阅模式的功能使用,下面就来使用这种方式来解决缓存一致性问题;
5、删除用户接口中向redis channel 推送消息
@Override
@CacheEvict(value = {"dbUser"},key = "#root.args[0]",cacheManager = "cacheManagerMinutes")
public String deleteById(String id) {
dbUserMapper.deleteByUserId(id);
redisTemplate.convertAndSend("redis.user", id);
return "delete success";
}
6、RedisMessageListener 改造
添加删除本地缓存逻辑
@Override
public void onMessage(Message message, byte[] pattern) {
CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();
// 获取消息
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
// 获取监听的频道
byte[] channelByte = message.getChannel();
// 使用字符串序列化器转换
Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
// 渠道名称转换
String patternStr = new String(pattern);
System.out.println(patternStr);
System.out.println("---频道---: " + channel);
System.out.println("---消息内容---: " + msg);
caffeineCacheUtils.evict("dbUser",patternStr);
}
7、模拟测试
启动服务后,直接调用删除用户接口,可以看到,监听逻辑中收到了一条消息,然后调用本地缓存工具类删除本地缓存即可
六、解决方案四:使用消息队列实现缓存同步
了解了redis发布订阅这种方式实现原理后,如果再更换为消息中间件来实现就不难理解了,其实现的大致思路如下:
- 调用删除接口删除用户;
- 向特定的队列推送一条删除消息;
- 在消息监听逻辑中接收消息,并清理本地缓存
以rabbitmq为例,其核心实现如下:
@RabbitHandler
public void process(String msg) {
System.out.println("topicMessageReceiver 接收到了消息 : " +msg);
//执行本地缓存的删除操作
}
关于rabbitmq的相关实现感兴趣的同学可以参考:rabbbitmq 技术全解
七、总结
关于后3三种的实现,不仅可以解决缓存一致性问题,同时适用于分布式应用的场景,算是比较通用的解决方案,但这样一来,引入了第三方组件,也增加了系统整体的复杂性,这一点需要在架构设计中进行综合考量,结合小编本人的一些实践经验,比较推荐使用redis的发布订阅模式,这种方式简单高效,同时兼顾了避免引入更多的外部组件,可酌情参考。
版权归原作者 小码农叔叔 所有, 如有侵权,请联系我们删除。