0


[jetcache-go] JetCache-go 多级缓存框架终于开源啦

一、引言

golang 开源框架的世界里有很多NB的分布式缓存框架,也有很多NB的本地内存缓存框架,但是没有NB的多级缓存框架!如果你体验过阿里巴巴开源的 JetCache 缓存框架,你就会觉得其他缓存框架都不够好用。为了解决缓存高性能、高可用面临的问题,也为了提高研发效率和质量,我们决定自研 golang 版本的 JetCache 缓存框架。
Featureeko/gocachebytedance/gopkggo-redis/cachemgtv-tech/jetcache-goStar数量2.4k1.7k741191多级缓存YNYY缓存旁路(loadable)YYYY泛型支持YxNY单飞模式YYYY缓存更新监听器(缓存一致)NYNY异步刷新NYNY指标统计YNY(简单)Y缓存空对象NNNY批量查询(MGet)NNNY稀疏列表缓存NNNY

二、JetCache-go 的魅力

2.1 功能概览

2.2 缓存旁路模式(Cache-Aside Pattern)

配置

  1. redis:
  2. test-redis:
  3. addrs:
  4. - 1.1.1.1:6442
  5. - 1.1.1.2:7440
  6. - 1.1.1.3:7441
  7. type: cluster
  8. readTimeout: 100ms
  9. writeTimeout: 100ms
  10. cache:
  11. graph:
  12. cacheType: both # 可选 both、remote、local 三种模式
  13. localType: tinyLFU # 可选 tinyLFU、freeCache 两种本地缓存类型
  14. codec: msgpack # 可选 json、sonic、msgpack 三种序列化方式
  15. redisName: test-redis
  16. refreshDuration: 1m # 自动刷新缓存周期
  17. stopRefreshAfterLastAccess: 5m # 缓存 key 多久没有访问后停止自动刷新
  18. localExpire: 1m # 本地缓存过期时长
  19. remoteExpire: 1h # 默认的远程缓存过期时长
  20. part:
  21. cacheType: local # 可选 both、remote、local 三种模式
  22. localType: freeCache # 可选 tinyLFU、freeCache 两种本地缓存类型
  23. codec: sonic # 可选 json、sonic、msgpack 三种序列化方式
  24. redisName: test-redis
  25. refreshDuration: 1m # 自动刷新缓存周期
  26. stopRefreshAfterLastAccess: 5m # 缓存 key 多久没有访问后停止自动刷新
  27. localExpire: 1m # 本地缓存过期时长
  28. remoteExpire: 1h # 默认的远程缓存过期时长

我们内部脚手架封装好了通过配置即可初始化缓存实例。

使用

Once接口查询,并开启缓存自动刷新。

  1. // GetSubGraphCache 查询知识图谱缓存
  2. func (s *Service) GetSubGraphCache(ctx context.Context, req *api.SubGraphReq) (resp *api.SubGraphResp, err error) {
  3. if err := req.Validate(); err != nil {
  4. return nil, err
  5. }
  6. key := model.KeySubGraph.Key(req.VertexId, req.ArtifactId, req.Depth)
  7. err = s.dao.CacheGraph.Once(ctx, key,
  8. cache.TTL(model.KeySubGraph.Expire()),
  9. cache.Refresh(true),
  10. cache.Value(&resp),
  11. cache.Do(func(ctx context.Context) (any, error) {
  12. return s.getSubGraph(ctx, req)
  13. }))
  14. return
  15. }
  16. // 查询知识图谱
  17. func (s *Service) getSubGraph(ctx context.Context, req *api.SubGraphReq) (resp *api.SubGraphResp, err error) {
  18. // 逻辑实现
  19. }

MGet(稀疏列表缓存)泛型接口查询

  1. // MGetPartCache 批量获取视频缓存
  2. func (d *Dao) MGetPartCache(ctx context.Context, partIds []int64) map[int64]*model.Part {
  3. return d.CachePart.MGet(ctx, model.KeyPart.Key(), partIds,
  4. func(ctx context.Context, ids []int64) (map[int64]*model.Part, error) {
  5. return d.Odin.MGetPart(ctx, ids)
  6. })
  7. }
  8. // MGetPart 批量获取分集信息
  9. func (c *Odin) MGetPart(ctx context.Context, partIds []int64) (map[int64]*model.Part, error) {
  10. // 查询逻辑
  11. }

通过缓存旁路模式,只需要简单的对业务方法进行代理,就能够叠加多级缓存的各种特效。

三、Golang版JetCache

介绍

jetcache-go是基于go-redis/cache拓展的通用缓存访问框架。 实现了类似Java版JetCache的核心功能,包括:

  • ✅ 二级缓存自由组合:本地缓存、分布式缓存、本地缓存+分布式缓存
  • Once接口采用单飞(singleflight)模式,高并发且线程安全
  • ✅ 默认采用MsgPack来编解码Value。可选sonic、原生json
  • ✅ 本地缓存默认实现了TinyLFU和FreeCache
  • ✅ 分布式缓存默认实现了go-redis/v8的适配器,你也可以自定义实现
  • ✅ 可以自定义errNotFound,通过占位符替换,缓存空结果防止缓存穿透
  • ✅ 支持开启分布式缓存异步刷新
  • ✅ 指标采集,默认实现了通过日志打印各级缓存的统计指标(QPM、Hit、Miss、Query、QueryFail)
  • ✅ 分布式缓存查询故障自动降级
  • MGet接口支持Load函数。带分布缓存场景,采用Pipeline模式实现 (v1.1.0+)
  • ✅ 支持拓展缓存更新后所有GO进程的本地缓存失效 (v1.1.1+)

安装

使用最新版本的jetcache-go,您可以在项目中导入该库:

  1. go get github.com/mgtv-tech/jetcache-go

快速开始

  1. package cache_test
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "time"
  9. "github.com/mgtv-tech/jetcache-go"
  10. "github.com/mgtv-tech/jetcache-go/local"
  11. "github.com/mgtv-tech/jetcache-go/remote"
  12. "github.com/redis/go-redis/v9"
  13. )
  14. var errRecordNotFound = errors.New("mock gorm.errRecordNotFound")
  15. type object struct {
  16. Str string
  17. Num int
  18. }
  19. func Example_basicUsage() {
  20. ring := redis.NewRing(&redis.RingOptions{
  21. Addrs: map[string]string{
  22. "localhost": ":6379",
  23. },
  24. })
  25. mycache := cache.New(cache.WithName("any"),
  26. cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
  27. cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
  28. cache.WithErrNotFound(errRecordNotFound))
  29. ctx := context.TODO()
  30. key := "mykey:1"
  31. obj, _ := mockDBGetObject(1)
  32. if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
  33. panic(err)
  34. }
  35. var wanted object
  36. if err := mycache.Get(ctx, key, &wanted); err == nil {
  37. fmt.Println(wanted)
  38. }
  39. // Output: {mystring 42}
  40. mycache.Close()
  41. }
  42. func Example_advancedUsage() {
  43. ring := redis.NewRing(&redis.RingOptions{
  44. Addrs: map[string]string{
  45. "localhost": ":6379",
  46. },
  47. })
  48. mycache := cache.New(cache.WithName("any"),
  49. cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
  50. cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
  51. cache.WithErrNotFound(errRecordNotFound),
  52. cache.WithRefreshDuration(time.Minute))
  53. ctx := context.TODO()
  54. key := "mykey:1"
  55. obj := new(object)
  56. if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
  57. cache.Do(func(ctx context.Context) (any, error) {
  58. return mockDBGetObject(1)
  59. })); err != nil {
  60. panic(err)
  61. }
  62. fmt.Println(obj)
  63. // Output: &{mystring 42}
  64. mycache.Close()
  65. }
  66. func Example_mGetUsage() {
  67. ring := redis.NewRing(&redis.RingOptions{
  68. Addrs: map[string]string{
  69. "localhost": ":6379",
  70. },
  71. })
  72. mycache := cache.New(cache.WithName("any"),
  73. cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
  74. cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
  75. cache.WithErrNotFound(errRecordNotFound),
  76. cache.WithRemoteExpiry(time.Minute),
  77. )
  78. cacheT := cache.NewT[int, *object](mycache)
  79. ctx := context.TODO()
  80. key := "mget"
  81. ids := []int{1, 2, 3}
  82. ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
  83. return mockDBMGetObject(ids)
  84. })
  85. var b bytes.Buffer
  86. for _, id := range ids {
  87. b.WriteString(fmt.Sprintf("%v", ret[id]))
  88. }
  89. fmt.Println(b.String())
  90. // Output: &{mystring 1}&{mystring 2}<nil>
  91. cacheT.Close()
  92. }
  93. func Example_syncLocalUsage() {
  94. ring := redis.NewRing(&redis.RingOptions{
  95. Addrs: map[string]string{
  96. "localhost": ":6379",
  97. },
  98. })
  99. sourceID := "12345678" // Unique identifier for this cache instance
  100. channelName := "syncLocalChannel"
  101. pubSub := ring.Subscribe(context.Background(), channelName)
  102. mycache := cache.New(cache.WithName("any"),
  103. cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
  104. cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
  105. cache.WithErrNotFound(errRecordNotFound),
  106. cache.WithRemoteExpiry(time.Minute),
  107. cache.WithSourceId(sourceID),
  108. cache.WithSyncLocal(true),
  109. cache.WithEventHandler(func(event *cache.Event) {
  110. // Broadcast local cache invalidation for the received keys
  111. bs, _ := json.Marshal(event)
  112. ring.Publish(context.Background(), channelName, string(bs))
  113. }),
  114. )
  115. obj, _ := mockDBGetObject(1)
  116. if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
  117. panic(err)
  118. }
  119. go func() {
  120. for {
  121. msg := <-pubSub.Channel()
  122. var event *cache.Event
  123. if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
  124. panic(err)
  125. }
  126. fmt.Println(event.Keys)
  127. // Invalidate local cache for received keys (except own events)
  128. if event.SourceID != sourceID {
  129. for _, key := range event.Keys {
  130. mycache.DeleteFromLocalCache(key)
  131. }
  132. }
  133. }
  134. }()
  135. // Output: [mykey]
  136. mycache.Close()
  137. time.Sleep(time.Second)
  138. }
  139. func mockDBGetObject(id int) (*object, error) {
  140. if id > 100 {
  141. return nil, errRecordNotFound
  142. }
  143. return &object{Str: "mystring", Num: 42}, nil
  144. }
  145. func mockDBMGetObject(ids []int) (map[int]*object, error) {
  146. ret := make(map[int]*object)
  147. for _, id := range ids {
  148. if id == 3 {
  149. continue
  150. }
  151. ret[id] = &object{Str: "mystring", Num: id}
  152. }
  153. return ret, nil
  154. }

配置选项

  1. // Options are used to store cache options.
  2. Options struct {
  3. name string // Cache name, used for log identification and metric reporting
  4. remote remote.Remote // Remote is distributed cache, such as Redis.
  5. local local.Local // Local is memory cache, such as FreeCache.
  6. codec string // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
  7. errNotFound error // Error to return for cache miss. Used to prevent cache penetration.
  8. remoteExpiry time.Duration // Remote cache ttl, Default is 1 hour.
  9. notFoundExpiry time.Duration // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
  10. offset time.Duration // Expiration time jitter factor for cache misses.
  11. refreshDuration time.Duration // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
  12. stopRefreshAfterLastAccess time.Duration // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
  13. refreshConcurrency int // Maximum number of concurrent cache refreshes. Default is 4.
  14. statsDisabled bool // Flag to disable cache statistics.
  15. statsHandler stats.Handler // Metrics statsHandler collector.
  16. sourceID string // Unique identifier for cache instance.
  17. syncLocal bool // Enable events for syncing local cache (only for "Both" cache type).
  18. eventChBufSize int // Buffer size for event channel (default: 100).
  19. eventHandler func(event *Event) // Function to handle local cache invalidation events.
  20. }

缓存指标收集和统计

您可以实现

  1. stats.Handler

接口并注册到Cache组件来自定义收集指标,例如使用Prometheus 采集指标。我们默认实现了通过日志打印统计指标,如下所示:

  1. 2023/09/11 16:42:30.695294 statslogger.go:178: [INFO] jetcache-go stats last 1m0s.
  2. cache | qpm| hit_ratio| hit| miss| query| query_fail
  3. ------------+------------+------------+------------+------------+------------+------------
  4. bench | 216440123| 100.00%| 216439867| 256| 256| 0|
  5. bench_local | 216440123| 100.00%| 216434970| 5153| -| -|
  6. bench_remote| 5153| 95.03%| 4897| 256| -| -|
  7. ------------+------------+------------+------------+------------+------------+------------

自定义日志

  1. import "github.com/mgtv-tech/jetcache-go/logger"
  2. // Set your Logger
  3. logger.SetDefaultLogger(l logger.Logger)

自定义编解码

  1. import (
  2. "github.com/mgtv-tech/jetcache-go"
  3. "github.com/mgtv-tech/jetcache-go/encoding"
  4. )
  5. // Register your codec
  6. encoding.RegisterCodec(codec Codec)
  7. // Set your codec name
  8. mycache := cache.New[string, any]("any",
  9. cache.WithRemote(...),
  10. cache.WithCodec(yourCodecName string))

使用场景说明

  1. jetcache-go

提供了自动刷新缓存的能力,目的是为了防止缓存失效时造成的雪崩效应打爆数据库。对一些key比较少,实时性要求不高,加载开销非常大的缓存场景,适合使用自动刷新。下面的代码指定每分钟刷新一次,1小时如果没有访问就停止刷新。如果缓存是redis或者多级缓存最后一级是redis,缓存加载行为是全局唯一的,也就是说不管有多少台服务器,同时只有一个服务器在刷新,目的是为了降低后端的加载负担。

  1. mycache := cache.New(cache.WithName("any"),
  2. // ...
  3. // cache.WithRefreshDuration 设置异步刷新时间间隔
  4. cache.WithRefreshDuration(time.Minute),
  5. // cache.WithStopRefreshAfterLastAccess 设置缓存 key 没有访问后的刷新任务取消时间
  6. cache.WithStopRefreshAfterLastAccess(time.Hour))
  7. // `Once` 接口通过 `cache.Refresh(true)` 开启自动刷新
  8. err := mycache.Once(ctx, key, cache.Value(obj), cache.Refresh(true), cache.Do(func(ctx context.Context) (any, error) {
  9. return mockDBGetObject(1)
  10. }))

MGet批量查询

  1. MGet

通过

  1. golang

的泛型机制 +

  1. Load

函数,非常友好的多级缓存批量查询ID对应的实体。如果缓存是redis或者多级缓存最后一级是redis,查询时采用

  1. Pipeline

实现读写操作,提升性能。需要说明是,针对异常场景(IO异常、序列化异常等),我们设计思路是尽可能提供有损服务,防止穿透。

  1. mycache := cache.New(cache.WithName("any"),
  2. // ...
  3. cache.WithRemoteExpiry(time.Minute),
  4. )
  5. cacheT := cache.NewT[int, *object](mycache)
  6. ctx := context.TODO()
  7. key := "mykey"
  8. ids := []int{1, 2, 3}
  9. ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
  10. return mockDBMGetObject(ids)
  11. })

Codec编解码选择

  1. jetcache-go

默认实现了三种编解码方式,sonic、MsgPack和原生

  1. json

选择指导:

  • 追求编解码性能: 例如本地缓存命中率极高,但本地缓存byte数组转对象的反序列化操作非常耗CPU,那么选择sonic
  • 兼顾性能和极致的存储空间: 选择MsgPack,MsgPack采用MsgPack编解码,内容>64个字节,会采用snappy压缩。

Tip:使用的时候记得按需导包来完成对应的编解码器注册

  1. _ "github.com/mgtv-tech/jetcache-go/encoding/sonic"

插件

插件项目:jetcache-go-plugin,欢迎参与共建。目前已实现如下插件:

  • Remote Adapter using go-redis v8
  • Prometheus-based Stats Handler

行动起来吧

欢迎大家使用jetcache-go,【请github上点个star吧】源代码地址 :https://github.com/mgtv-tech/jetcache-go


本文转载自: https://blog.csdn.net/daoshenzzg008/article/details/142328653
版权归原作者 daoshenzzg008 所有, 如有侵权,请联系我们删除。

“[jetcache-go] JetCache-go 多级缓存框架终于开源啦”的评论:

还没有评论