一、引言
golang 开源框架的世界里有很多NB的分布式缓存框架,也有很多NB的本地内存缓存框架,但是没有NB的多级缓存框架!如果你体验过阿里巴巴开源的 JetCache 缓存框架,你就会觉得其他缓存框架都不够好用。为了解决缓存高性能、高可用面临的问题,也为了提高研发效率和质量,我们决定自研 golang 版本的 JetCache 缓存框架。
Featureeko/gocachebytedance/gopkggo-redis/cachemgtv-tech/jetcache-goStar数量2.4k1.7k741191多级缓存YNYY缓存旁路(loadable)YYYY泛型支持YxNY单飞模式YYYY缓存更新监听器(缓存一致)NYNY异步刷新NYNY指标统计YNY(简单)Y缓存空对象NNNY批量查询(MGet)NNNY稀疏列表缓存NNNY
二、JetCache-go 的魅力
2.1 功能概览
2.2 缓存旁路模式(Cache-Aside Pattern)
配置
redis:
test-redis:
addrs:
- 1.1.1.1:6442
- 1.1.1.2:7440
- 1.1.1.3:7441
type: cluster
readTimeout: 100ms
writeTimeout: 100ms
cache:
graph:
cacheType: both # 可选 both、remote、local 三种模式
localType: tinyLFU # 可选 tinyLFU、freeCache 两种本地缓存类型
codec: msgpack # 可选 json、sonic、msgpack 三种序列化方式
redisName: test-redis
refreshDuration: 1m # 自动刷新缓存周期
stopRefreshAfterLastAccess: 5m # 缓存 key 多久没有访问后停止自动刷新
localExpire: 1m # 本地缓存过期时长
remoteExpire: 1h # 默认的远程缓存过期时长
part:
cacheType: local # 可选 both、remote、local 三种模式
localType: freeCache # 可选 tinyLFU、freeCache 两种本地缓存类型
codec: sonic # 可选 json、sonic、msgpack 三种序列化方式
redisName: test-redis
refreshDuration: 1m # 自动刷新缓存周期
stopRefreshAfterLastAccess: 5m # 缓存 key 多久没有访问后停止自动刷新
localExpire: 1m # 本地缓存过期时长
remoteExpire: 1h # 默认的远程缓存过期时长
我们内部脚手架封装好了通过配置即可初始化缓存实例。
使用
Once接口查询,并开启缓存自动刷新。
// GetSubGraphCache 查询知识图谱缓存
func (s *Service) GetSubGraphCache(ctx context.Context, req *api.SubGraphReq) (resp *api.SubGraphResp, err error) {
if err := req.Validate(); err != nil {
return nil, err
}
key := model.KeySubGraph.Key(req.VertexId, req.ArtifactId, req.Depth)
err = s.dao.CacheGraph.Once(ctx, key,
cache.TTL(model.KeySubGraph.Expire()),
cache.Refresh(true),
cache.Value(&resp),
cache.Do(func(ctx context.Context) (any, error) {
return s.getSubGraph(ctx, req)
}))
return
}
// 查询知识图谱
func (s *Service) getSubGraph(ctx context.Context, req *api.SubGraphReq) (resp *api.SubGraphResp, err error) {
// 逻辑实现
}
MGet(稀疏列表缓存)泛型接口查询
// MGetPartCache 批量获取视频缓存
func (d *Dao) MGetPartCache(ctx context.Context, partIds []int64) map[int64]*model.Part {
return d.CachePart.MGet(ctx, model.KeyPart.Key(), partIds,
func(ctx context.Context, ids []int64) (map[int64]*model.Part, error) {
return d.Odin.MGetPart(ctx, ids)
})
}
// MGetPart 批量获取分集信息
func (c *Odin) MGetPart(ctx context.Context, partIds []int64) (map[int64]*model.Part, error) {
// 查询逻辑
}
通过缓存旁路模式,只需要简单的对业务方法进行代理,就能够叠加多级缓存的各种特效。
三、Golang版JetCache
介绍
jetcache-go是基于go-redis/cache拓展的通用缓存访问框架。 实现了类似Java版JetCache的核心功能,包括:
- ✅ 二级缓存自由组合:本地缓存、分布式缓存、本地缓存+分布式缓存
- ✅
Once
接口采用单飞(singleflight
)模式,高并发且线程安全 - ✅ 默认采用MsgPack来编解码Value。可选sonic、原生
json
- ✅ 本地缓存默认实现了TinyLFU和FreeCache
- ✅ 分布式缓存默认实现了go-redis/v8的适配器,你也可以自定义实现
- ✅ 可以自定义
errNotFound
,通过占位符替换,缓存空结果防止缓存穿透 - ✅ 支持开启分布式缓存异步刷新
- ✅ 指标采集,默认实现了通过日志打印各级缓存的统计指标(QPM、Hit、Miss、Query、QueryFail)
- ✅ 分布式缓存查询故障自动降级
- ✅
MGet
接口支持Load
函数。带分布缓存场景,采用Pipeline
模式实现 (v1.1.0+) - ✅ 支持拓展缓存更新后所有GO进程的本地缓存失效 (v1.1.1+)
安装
使用最新版本的jetcache-go,您可以在项目中导入该库:
go get github.com/mgtv-tech/jetcache-go
快速开始
package cache_test
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/mgtv-tech/jetcache-go"
"github.com/mgtv-tech/jetcache-go/local"
"github.com/mgtv-tech/jetcache-go/remote"
"github.com/redis/go-redis/v9"
)
var errRecordNotFound = errors.New("mock gorm.errRecordNotFound")
type object struct {
Str string
Num int
}
func Example_basicUsage() {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"localhost": ":6379",
},
})
mycache := cache.New(cache.WithName("any"),
cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
cache.WithErrNotFound(errRecordNotFound))
ctx := context.TODO()
key := "mykey:1"
obj, _ := mockDBGetObject(1)
if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
panic(err)
}
var wanted object
if err := mycache.Get(ctx, key, &wanted); err == nil {
fmt.Println(wanted)
}
// Output: {mystring 42}
mycache.Close()
}
func Example_advancedUsage() {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"localhost": ":6379",
},
})
mycache := cache.New(cache.WithName("any"),
cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
cache.WithErrNotFound(errRecordNotFound),
cache.WithRefreshDuration(time.Minute))
ctx := context.TODO()
key := "mykey:1"
obj := new(object)
if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
cache.Do(func(ctx context.Context) (any, error) {
return mockDBGetObject(1)
})); err != nil {
panic(err)
}
fmt.Println(obj)
// Output: &{mystring 42}
mycache.Close()
}
func Example_mGetUsage() {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"localhost": ":6379",
},
})
mycache := cache.New(cache.WithName("any"),
cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
cache.WithErrNotFound(errRecordNotFound),
cache.WithRemoteExpiry(time.Minute),
)
cacheT := cache.NewT[int, *object](mycache)
ctx := context.TODO()
key := "mget"
ids := []int{1, 2, 3}
ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
return mockDBMGetObject(ids)
})
var b bytes.Buffer
for _, id := range ids {
b.WriteString(fmt.Sprintf("%v", ret[id]))
}
fmt.Println(b.String())
// Output: &{mystring 1}&{mystring 2}<nil>
cacheT.Close()
}
func Example_syncLocalUsage() {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"localhost": ":6379",
},
})
sourceID := "12345678" // Unique identifier for this cache instance
channelName := "syncLocalChannel"
pubSub := ring.Subscribe(context.Background(), channelName)
mycache := cache.New(cache.WithName("any"),
cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
cache.WithErrNotFound(errRecordNotFound),
cache.WithRemoteExpiry(time.Minute),
cache.WithSourceId(sourceID),
cache.WithSyncLocal(true),
cache.WithEventHandler(func(event *cache.Event) {
// Broadcast local cache invalidation for the received keys
bs, _ := json.Marshal(event)
ring.Publish(context.Background(), channelName, string(bs))
}),
)
obj, _ := mockDBGetObject(1)
if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
panic(err)
}
go func() {
for {
msg := <-pubSub.Channel()
var event *cache.Event
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
panic(err)
}
fmt.Println(event.Keys)
// Invalidate local cache for received keys (except own events)
if event.SourceID != sourceID {
for _, key := range event.Keys {
mycache.DeleteFromLocalCache(key)
}
}
}
}()
// Output: [mykey]
mycache.Close()
time.Sleep(time.Second)
}
func mockDBGetObject(id int) (*object, error) {
if id > 100 {
return nil, errRecordNotFound
}
return &object{Str: "mystring", Num: 42}, nil
}
func mockDBMGetObject(ids []int) (map[int]*object, error) {
ret := make(map[int]*object)
for _, id := range ids {
if id == 3 {
continue
}
ret[id] = &object{Str: "mystring", Num: id}
}
return ret, nil
}
配置选项
// Options are used to store cache options.
Options struct {
name string // Cache name, used for log identification and metric reporting
remote remote.Remote // Remote is distributed cache, such as Redis.
local local.Local // Local is memory cache, such as FreeCache.
codec string // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
errNotFound error // Error to return for cache miss. Used to prevent cache penetration.
remoteExpiry time.Duration // Remote cache ttl, Default is 1 hour.
notFoundExpiry time.Duration // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
offset time.Duration // Expiration time jitter factor for cache misses.
refreshDuration time.Duration // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
stopRefreshAfterLastAccess time.Duration // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
refreshConcurrency int // Maximum number of concurrent cache refreshes. Default is 4.
statsDisabled bool // Flag to disable cache statistics.
statsHandler stats.Handler // Metrics statsHandler collector.
sourceID string // Unique identifier for cache instance.
syncLocal bool // Enable events for syncing local cache (only for "Both" cache type).
eventChBufSize int // Buffer size for event channel (default: 100).
eventHandler func(event *Event) // Function to handle local cache invalidation events.
}
缓存指标收集和统计
您可以实现
stats.Handler
接口并注册到Cache组件来自定义收集指标,例如使用Prometheus 采集指标。我们默认实现了通过日志打印统计指标,如下所示:
2023/09/11 16:42:30.695294 statslogger.go:178: [INFO] jetcache-go stats last 1m0s.
cache | qpm| hit_ratio| hit| miss| query| query_fail
------------+------------+------------+------------+------------+------------+------------
bench | 216440123| 100.00%| 216439867| 256| 256| 0|
bench_local | 216440123| 100.00%| 216434970| 5153| -| -|
bench_remote| 5153| 95.03%| 4897| 256| -| -|
------------+------------+------------+------------+------------+------------+------------
自定义日志
import "github.com/mgtv-tech/jetcache-go/logger"
// Set your Logger
logger.SetDefaultLogger(l logger.Logger)
自定义编解码
import (
"github.com/mgtv-tech/jetcache-go"
"github.com/mgtv-tech/jetcache-go/encoding"
)
// Register your codec
encoding.RegisterCodec(codec Codec)
// Set your codec name
mycache := cache.New[string, any]("any",
cache.WithRemote(...),
cache.WithCodec(yourCodecName string))
使用场景说明
jetcache-go
提供了自动刷新缓存的能力,目的是为了防止缓存失效时造成的雪崩效应打爆数据库。对一些key比较少,实时性要求不高,加载开销非常大的缓存场景,适合使用自动刷新。下面的代码指定每分钟刷新一次,1小时如果没有访问就停止刷新。如果缓存是redis或者多级缓存最后一级是redis,缓存加载行为是全局唯一的,也就是说不管有多少台服务器,同时只有一个服务器在刷新,目的是为了降低后端的加载负担。
mycache := cache.New(cache.WithName("any"),
// ...
// cache.WithRefreshDuration 设置异步刷新时间间隔
cache.WithRefreshDuration(time.Minute),
// cache.WithStopRefreshAfterLastAccess 设置缓存 key 没有访问后的刷新任务取消时间
cache.WithStopRefreshAfterLastAccess(time.Hour))
// `Once` 接口通过 `cache.Refresh(true)` 开启自动刷新
err := mycache.Once(ctx, key, cache.Value(obj), cache.Refresh(true), cache.Do(func(ctx context.Context) (any, error) {
return mockDBGetObject(1)
}))
MGet批量查询
MGet
通过
golang
的泛型机制 +
Load
函数,非常友好的多级缓存批量查询ID对应的实体。如果缓存是redis或者多级缓存最后一级是redis,查询时采用
Pipeline
实现读写操作,提升性能。需要说明是,针对异常场景(IO异常、序列化异常等),我们设计思路是尽可能提供有损服务,防止穿透。
mycache := cache.New(cache.WithName("any"),
// ...
cache.WithRemoteExpiry(time.Minute),
)
cacheT := cache.NewT[int, *object](mycache)
ctx := context.TODO()
key := "mykey"
ids := []int{1, 2, 3}
ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
return mockDBMGetObject(ids)
})
Codec编解码选择
jetcache-go
默认实现了三种编解码方式,sonic、MsgPack和原生
json
。
选择指导:
- 追求编解码性能: 例如本地缓存命中率极高,但本地缓存byte数组转对象的反序列化操作非常耗CPU,那么选择
sonic
。 - 兼顾性能和极致的存储空间: 选择
MsgPack
,MsgPack采用MsgPack编解码,内容>64个字节,会采用snappy
压缩。
Tip:使用的时候记得按需导包来完成对应的编解码器注册
_ "github.com/mgtv-tech/jetcache-go/encoding/sonic"
插件
插件项目:jetcache-go-plugin,欢迎参与共建。目前已实现如下插件:
- Remote Adapter using go-redis v8
- Prometheus-based Stats Handler
行动起来吧
欢迎大家使用jetcache-go,【请github上点个star吧】源代码地址 :https://github.com/mgtv-tech/jetcache-go
版权归原作者 daoshenzzg008 所有, 如有侵权,请联系我们删除。