0


[jetcache-go] JetCache-go 多级缓存框架终于开源啦

一、引言

golang 开源框架的世界里有很多NB的分布式缓存框架,也有很多NB的本地内存缓存框架,但是没有NB的多级缓存框架!如果你体验过阿里巴巴开源的 JetCache 缓存框架,你就会觉得其他缓存框架都不够好用。为了解决缓存高性能、高可用面临的问题,也为了提高研发效率和质量,我们决定自研 golang 版本的 JetCache 缓存框架。
Featureeko/gocachebytedance/gopkggo-redis/cachemgtv-tech/jetcache-goStar数量2.4k1.7k741191多级缓存YNYY缓存旁路(loadable)YYYY泛型支持YxNY单飞模式YYYY缓存更新监听器(缓存一致)NYNY异步刷新NYNY指标统计YNY(简单)Y缓存空对象NNNY批量查询(MGet)NNNY稀疏列表缓存NNNY

二、JetCache-go 的魅力

2.1 功能概览

2.2 缓存旁路模式(Cache-Aside Pattern)

配置

redis:
  test-redis:
    addrs:
      - 1.1.1.1:6442
      - 1.1.1.2:7440
      - 1.1.1.3:7441
    type: cluster
    readTimeout: 100ms
    writeTimeout: 100ms
cache:
  graph:
    cacheType: both # 可选 both、remote、local 三种模式
    localType: tinyLFU # 可选 tinyLFU、freeCache 两种本地缓存类型
    codec: msgpack # 可选 json、sonic、msgpack 三种序列化方式
    redisName: test-redis
    refreshDuration: 1m # 自动刷新缓存周期
    stopRefreshAfterLastAccess: 5m # 缓存 key 多久没有访问后停止自动刷新
    localExpire: 1m # 本地缓存过期时长
    remoteExpire: 1h # 默认的远程缓存过期时长
  part:
    cacheType: local # 可选 both、remote、local 三种模式
    localType: freeCache # 可选 tinyLFU、freeCache 两种本地缓存类型
    codec: sonic # 可选 json、sonic、msgpack 三种序列化方式
    redisName: test-redis
    refreshDuration: 1m # 自动刷新缓存周期
    stopRefreshAfterLastAccess: 5m # 缓存 key 多久没有访问后停止自动刷新
    localExpire: 1m # 本地缓存过期时长
    remoteExpire: 1h # 默认的远程缓存过期时长

我们内部脚手架封装好了通过配置即可初始化缓存实例。

使用

Once接口查询,并开启缓存自动刷新。

// GetSubGraphCache 查询知识图谱缓存
func (s *Service) GetSubGraphCache(ctx context.Context, req *api.SubGraphReq) (resp *api.SubGraphResp, err error) {
    if err := req.Validate(); err != nil {
       return nil, err
    }

    key := model.KeySubGraph.Key(req.VertexId, req.ArtifactId, req.Depth)
    err = s.dao.CacheGraph.Once(ctx, key,
       cache.TTL(model.KeySubGraph.Expire()),
       cache.Refresh(true),
       cache.Value(&resp),
       cache.Do(func(ctx context.Context) (any, error) {
          return s.getSubGraph(ctx, req)
       }))

    return
}

// 查询知识图谱
func (s *Service) getSubGraph(ctx context.Context, req *api.SubGraphReq) (resp *api.SubGraphResp, err error) {
    // 逻辑实现
}

MGet(稀疏列表缓存)泛型接口查询

// MGetPartCache 批量获取视频缓存
func (d *Dao) MGetPartCache(ctx context.Context, partIds []int64) map[int64]*model.Part {
    return d.CachePart.MGet(ctx, model.KeyPart.Key(), partIds,
       func(ctx context.Context, ids []int64) (map[int64]*model.Part, error) {
          return d.Odin.MGetPart(ctx, ids)
       })
}

// MGetPart 批量获取分集信息
func (c *Odin) MGetPart(ctx context.Context, partIds []int64) (map[int64]*model.Part, error) {
    // 查询逻辑
}

通过缓存旁路模式,只需要简单的对业务方法进行代理,就能够叠加多级缓存的各种特效。

三、Golang版JetCache

介绍

jetcache-go是基于go-redis/cache拓展的通用缓存访问框架。 实现了类似Java版JetCache的核心功能,包括:

  • ✅ 二级缓存自由组合:本地缓存、分布式缓存、本地缓存+分布式缓存
  • Once接口采用单飞(singleflight)模式,高并发且线程安全
  • ✅ 默认采用MsgPack来编解码Value。可选sonic、原生json
  • ✅ 本地缓存默认实现了TinyLFU和FreeCache
  • ✅ 分布式缓存默认实现了go-redis/v8的适配器,你也可以自定义实现
  • ✅ 可以自定义errNotFound,通过占位符替换,缓存空结果防止缓存穿透
  • ✅ 支持开启分布式缓存异步刷新
  • ✅ 指标采集,默认实现了通过日志打印各级缓存的统计指标(QPM、Hit、Miss、Query、QueryFail)
  • ✅ 分布式缓存查询故障自动降级
  • MGet接口支持Load函数。带分布缓存场景,采用Pipeline模式实现 (v1.1.0+)
  • ✅ 支持拓展缓存更新后所有GO进程的本地缓存失效 (v1.1.1+)

安装

使用最新版本的jetcache-go,您可以在项目中导入该库:

go get github.com/mgtv-tech/jetcache-go

快速开始

package cache_test

import (
    "bytes"
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "time"

    "github.com/mgtv-tech/jetcache-go"
    "github.com/mgtv-tech/jetcache-go/local"
    "github.com/mgtv-tech/jetcache-go/remote"
    "github.com/redis/go-redis/v9"
)

var errRecordNotFound = errors.New("mock gorm.errRecordNotFound")

type object struct {
    Str string
    Num int
}

func Example_basicUsage() {
    ring := redis.NewRing(&redis.RingOptions{
        Addrs: map[string]string{
            "localhost": ":6379",
        },
    })

    mycache := cache.New(cache.WithName("any"),
        cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
        cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
        cache.WithErrNotFound(errRecordNotFound))

    ctx := context.TODO()
    key := "mykey:1"
    obj, _ := mockDBGetObject(1)
    if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
        panic(err)
    }

    var wanted object
    if err := mycache.Get(ctx, key, &wanted); err == nil {
        fmt.Println(wanted)
    }
    // Output: {mystring 42}

    mycache.Close()
}

func Example_advancedUsage() {
    ring := redis.NewRing(&redis.RingOptions{
        Addrs: map[string]string{
            "localhost": ":6379",
        },
    })

    mycache := cache.New(cache.WithName("any"),
        cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
        cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
        cache.WithErrNotFound(errRecordNotFound),
        cache.WithRefreshDuration(time.Minute))

    ctx := context.TODO()
    key := "mykey:1"
    obj := new(object)
    if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
        cache.Do(func(ctx context.Context) (any, error) {
            return mockDBGetObject(1)
        })); err != nil {
        panic(err)
    }
    fmt.Println(obj)
    // Output: &{mystring 42}

    mycache.Close()
}

func Example_mGetUsage() {
    ring := redis.NewRing(&redis.RingOptions{
        Addrs: map[string]string{
            "localhost": ":6379",
        },
    })

    mycache := cache.New(cache.WithName("any"),
        cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
        cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
        cache.WithErrNotFound(errRecordNotFound),
        cache.WithRemoteExpiry(time.Minute),
    )
    cacheT := cache.NewT[int, *object](mycache)

    ctx := context.TODO()
    key := "mget"
    ids := []int{1, 2, 3}

    ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
        return mockDBMGetObject(ids)
    })

    var b bytes.Buffer
    for _, id := range ids {
        b.WriteString(fmt.Sprintf("%v", ret[id]))
    }
    fmt.Println(b.String())
    // Output: &{mystring 1}&{mystring 2}<nil>

    cacheT.Close()
}

func Example_syncLocalUsage() {
    ring := redis.NewRing(&redis.RingOptions{
        Addrs: map[string]string{
            "localhost": ":6379",
        },
    })

    sourceID := "12345678" // Unique identifier for this cache instance
    channelName := "syncLocalChannel"
    pubSub := ring.Subscribe(context.Background(), channelName)

    mycache := cache.New(cache.WithName("any"),
        cache.WithRemote(remote.NewGoRedisV9Adapter(ring)),
        cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
        cache.WithErrNotFound(errRecordNotFound),
        cache.WithRemoteExpiry(time.Minute),
        cache.WithSourceId(sourceID),
        cache.WithSyncLocal(true),
        cache.WithEventHandler(func(event *cache.Event) {
            // Broadcast local cache invalidation for the received keys
            bs, _ := json.Marshal(event)
            ring.Publish(context.Background(), channelName, string(bs))
        }),
    )
    obj, _ := mockDBGetObject(1)
    if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
        panic(err)
    }

    go func() {
        for {
            msg := <-pubSub.Channel()
            var event *cache.Event
            if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
                panic(err)
            }
            fmt.Println(event.Keys)

            // Invalidate local cache for received keys (except own events)
            if event.SourceID != sourceID {
                for _, key := range event.Keys {
                    mycache.DeleteFromLocalCache(key)
                }
            }
        }
    }()

    // Output: [mykey]
    mycache.Close()
    time.Sleep(time.Second)
}

func mockDBGetObject(id int) (*object, error) {
    if id > 100 {
        return nil, errRecordNotFound
    }
    return &object{Str: "mystring", Num: 42}, nil
}

func mockDBMGetObject(ids []int) (map[int]*object, error) {
    ret := make(map[int]*object)
    for _, id := range ids {
        if id == 3 {
            continue
        }
        ret[id] = &object{Str: "mystring", Num: id}
    }
    return ret, nil
}

配置选项

// Options are used to store cache options.
Options struct {
    name                       string             // Cache name, used for log identification and metric reporting
    remote                     remote.Remote      // Remote is distributed cache, such as Redis.
    local                      local.Local        // Local is memory cache, such as FreeCache.
    codec                      string             // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
    errNotFound                error              // Error to return for cache miss. Used to prevent cache penetration.
    remoteExpiry               time.Duration      // Remote cache ttl, Default is 1 hour.
    notFoundExpiry             time.Duration      // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
    offset                     time.Duration      // Expiration time jitter factor for cache misses.
    refreshDuration            time.Duration      // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
    stopRefreshAfterLastAccess time.Duration      // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
    refreshConcurrency         int                // Maximum number of concurrent cache refreshes. Default is 4.
    statsDisabled              bool               // Flag to disable cache statistics.
    statsHandler               stats.Handler      // Metrics statsHandler collector.
    sourceID                   string             // Unique identifier for cache instance.
    syncLocal                  bool               // Enable events for syncing local cache (only for "Both" cache type).
    eventChBufSize             int                // Buffer size for event channel (default: 100).
    eventHandler               func(event *Event) // Function to handle local cache invalidation events.
}

缓存指标收集和统计

您可以实现

stats.Handler

接口并注册到Cache组件来自定义收集指标,例如使用Prometheus 采集指标。我们默认实现了通过日志打印统计指标,如下所示:

2023/09/11 16:42:30.695294 statslogger.go:178: [INFO] jetcache-go stats last 1m0s.
cache       |         qpm|   hit_ratio|         hit|        miss|       query|  query_fail
------------+------------+------------+------------+------------+------------+------------
bench       |   216440123|     100.00%|   216439867|         256|         256|           0|
bench_local |   216440123|     100.00%|   216434970|        5153|           -|           -|
bench_remote|        5153|      95.03%|        4897|         256|           -|           -|
------------+------------+------------+------------+------------+------------+------------

自定义日志

import "github.com/mgtv-tech/jetcache-go/logger"

// Set your Logger
logger.SetDefaultLogger(l logger.Logger)

自定义编解码

import (
    "github.com/mgtv-tech/jetcache-go"
    "github.com/mgtv-tech/jetcache-go/encoding"
)

// Register your codec
encoding.RegisterCodec(codec Codec)

// Set your codec name
mycache := cache.New[string, any]("any",
    cache.WithRemote(...),
    cache.WithCodec(yourCodecName string))

使用场景说明

jetcache-go

提供了自动刷新缓存的能力,目的是为了防止缓存失效时造成的雪崩效应打爆数据库。对一些key比较少,实时性要求不高,加载开销非常大的缓存场景,适合使用自动刷新。下面的代码指定每分钟刷新一次,1小时如果没有访问就停止刷新。如果缓存是redis或者多级缓存最后一级是redis,缓存加载行为是全局唯一的,也就是说不管有多少台服务器,同时只有一个服务器在刷新,目的是为了降低后端的加载负担。

mycache := cache.New(cache.WithName("any"),
        // ...
        // cache.WithRefreshDuration 设置异步刷新时间间隔
        cache.WithRefreshDuration(time.Minute),
        // cache.WithStopRefreshAfterLastAccess 设置缓存 key 没有访问后的刷新任务取消时间
        cache.WithStopRefreshAfterLastAccess(time.Hour))

// `Once` 接口通过 `cache.Refresh(true)` 开启自动刷新
err := mycache.Once(ctx, key, cache.Value(obj), cache.Refresh(true), cache.Do(func(ctx context.Context) (any, error) {
    return mockDBGetObject(1)
}))

MGet批量查询

MGet

通过

golang

的泛型机制 +

Load

函数,非常友好的多级缓存批量查询ID对应的实体。如果缓存是redis或者多级缓存最后一级是redis,查询时采用

Pipeline

实现读写操作,提升性能。需要说明是,针对异常场景(IO异常、序列化异常等),我们设计思路是尽可能提供有损服务,防止穿透。

mycache := cache.New(cache.WithName("any"),
        // ...
        cache.WithRemoteExpiry(time.Minute),
    )
cacheT := cache.NewT[int, *object](mycache)

ctx := context.TODO()
key := "mykey"
ids := []int{1, 2, 3}

ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
    return mockDBMGetObject(ids)
})

Codec编解码选择

jetcache-go

默认实现了三种编解码方式,sonic、MsgPack和原生

json

选择指导:

  • 追求编解码性能: 例如本地缓存命中率极高,但本地缓存byte数组转对象的反序列化操作非常耗CPU,那么选择sonic
  • 兼顾性能和极致的存储空间: 选择MsgPack,MsgPack采用MsgPack编解码,内容>64个字节,会采用snappy压缩。

Tip:使用的时候记得按需导包来完成对应的编解码器注册

 _ "github.com/mgtv-tech/jetcache-go/encoding/sonic"

插件

插件项目:jetcache-go-plugin,欢迎参与共建。目前已实现如下插件:

  • Remote Adapter using go-redis v8
  • Prometheus-based Stats Handler

行动起来吧

欢迎大家使用jetcache-go,【请github上点个star吧】源代码地址 :https://github.com/mgtv-tech/jetcache-go


本文转载自: https://blog.csdn.net/daoshenzzg008/article/details/142328653
版权归原作者 daoshenzzg008 所有, 如有侵权,请联系我们删除。

“[jetcache-go] JetCache-go 多级缓存框架终于开源啦”的评论:

还没有评论