0


Dubbo go 高级特性

今天是2022年10月24号,广大程序员口中程序员的节日,在此 祝大家节日快乐,在以后的编码生涯中一撸到底无BUG,哈哈。言归正传,今天来学一下Dubbo go中的几个高级特性。

前提条件

  • 下载代码
# 1. 下载 官网 sample 源代码git clone https://github.com/apache/dubbo-go-samples

# 2. 在项目根目录下 执行以下命令 下载golang的相关依赖
go mod tidy
  • 启动 zookeeper

上下文传递

用于客户端、服务端在RPC调用过程中 增加额外的参数信息,已满足特殊的需求。如在一次RPC中需要实现全链路日志跟踪的需求,用来方便日志排查、问题追踪、性能优化等等。

此类需求,通常的做法是调用方生成一个唯一请求ID,然后将ID传递给后续的每一个RPC调用。因此在系统运维的角度看,只需要找到该ID,就可以定位一个请求的前世今生,包含整个rpc方法执行的调用链,对于定位问题非常的有帮助。

如系统A -> 系统B -> 系统C。当意外情况出现时,就可以根据全链路日志跟踪系统,判断到底哪个系统出现问题了。

该demo代码在 context 子模块中。

运行服务端代码

# 1. 跳转到指定目录cd /Users/andy/tool/golang/dubbo-go-samples/context/triple/go-server/cmd

# 2. 导出配置文件地址exportDUBBO_GO_CONFIG_PATH=/Users/andy/tool/golang/dubbo-go-samples/context/triple/go-server/conf/dubbogo.yml

# 3. 运行代码
go run .

运行客户端代码

# 1. 跳转到指定目录cd /Users/andy/tool/golang/dubbo-go-samples/context/triple/go-client/cmd

# 2. 导出配置文件地址exportDUBBO_GO_CONFIG_PATH=/Users/andy/tool/golang/dubbo-go-samples/context/triple/go-client/conf/dubbogo.yml

# 3. 运行代码
go run .

服务端输入的日志截图如下:

在这里插入图片描述

核心代码

  • 服务端
func (s *GreeterProvider) SayHelloStream(svr api.Greeter_SayHelloStreamServer) error {
    // map must be assert to map[string]interface, because of dubbo limitation
    attachments := svr.Context().Value(constant.AttachmentKey).(map[string]interface{})

    // value must be assert to []string[0], because of http2 header limitation
    logger.Infof("get triple attachment key1 = %s", attachments["key1"].([]string)[0])
    logger.Infof("get triple attachment key2 = %s", attachments["key2"].([]string)[0])
    logger.Infof("get triple attachment key3 = %s and %s", attachments["key3"].([]string)[0],
        attachments["key3"].([]string)[1])
  // 当传递参数为数组时,根据下标获取 每一个参数的值
    logger.Infof("get triple attachment key4 = %s and %s", attachments["key4"].([]string)[0],
        attachments["key4"].([]string)[1])
    c, err := svr.Recv()
    if err != nil {
        return err
    }
    logger.Infof("Dubbo-go3 GreeterProvider recv 1 user, name = %s\n", c.Name)
    err = svr.Send(&api.User{
        Name: "hello " + c.Name,
        Age:  18,
        Id:   "123456789",
    })
    if err != nil {
        return err
    }
    return nil
}
  • 客户端
func main() {
    err := config.Load()
    if err != nil {
        panic(err)
    }

    logger.Info("start to test triple unary context attachment transport")
    req := &api.HelloRequest{
        Name: "laurence",
    }
    ctx := context.Background()
    // set user defined context attachment, map value can be string or []string, otherwise it is not to be transferred
    userDefinedValueMap := make(map[string]interface{})
    userDefinedValueMap["key1"] = "user defined value 1"
    userDefinedValueMap["key2"] = "user defined value 2"
    userDefinedValueMap["key3"] = []string{"user defined value 3.1", "user defined value 3.2"}
    userDefinedValueMap["key4"] = []string{"user defined value 4.1", "user defined value 4.2"}
    ctx = context.WithValue(ctx, constant.AttachmentKey, userDefinedValueMap)
    reply, err := grpcGreeterImpl.SayHello(ctx, req)
    if err != nil {
        logger.Error(err)
    }
    logger.Infof("client response result: %v\n", reply)

    //stream rpc
    logger.Info("start to test triple streaming rpc context attachment transport")
    request := &api.HelloRequest{
        Name: "laurence",
    }
    stream, err := grpcGreeterImpl.SayHelloStream(ctx)
    if err != nil {
        logger.Error(err)
    }
    // stream grpc双向流式发送
    err = stream.Send(request)
    if err != nil {
        logger.Error(err)
    }
    logger.Infof("client stream send request: %v\n", request)

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        reply, err := stream.Recv()
        if err != nil {
            logger.Error(err)
        }
        logger.Infof("client stream received result: %v\n", reply)
    }()
    wg.Wait()
}

异常信息回传

用户可以在 provider 端生成用户定义的异常信息,可以记录异常产生堆栈,triple 协议可保证将用户在客户端获取到异常 message ,并可以查看报错堆栈,便于定位问题。

运行服务端代码

# 1. 跳转到项目目录cd /Users/andy/tool/golang/dubbo-go-samples/error/triple/hessian2/go-server/cmd

# 2. 导出配置文件exportDUBBO_GO_CONFIG_PATH=/Users/andy/tool/golang/dubbo-go-samples/error/triple/hessian2/go-service/conf/dubbogo.yml

# 3. 启动服务
go run .

运行客户端代码

# 1. 跳转到项目目录cd /Users/andy/tool/golang/dubbo-go-samples/helloworld/java-client

# 2. 授权 sh 脚本exportDUBBO_GO_CONFIG_PATH=/Users/andy/tool/golang/dubbo-go-samples/error/triple/hessian2/go-client/conf/dubbogo.yml

# 3. 启动 项目
go run .
  • 服务端截图

在这里插入图片描述

  • 客户端截图

在这里插入图片描述

核心代码

  • 服务端代码
package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

import (
    "dubbo.apache.org/dubbo-go/v3/common/logger"
    "dubbo.apache.org/dubbo-go/v3/config"
    _ "dubbo.apache.org/dubbo-go/v3/imports"

    hessian "github.com/apache/dubbo-go-hessian2"
)

var (
    survivalTimeout = int(3 * time.Second)
)

func init() {
    // ------for hessian2------
    hessian.RegisterPOJO(&User{})
    config.SetProviderService(&ErrorResponseProvider{})
}

// export DUBBO_GO_CONFIG_PATH= PATH_TO_SAMPLES/rpc/triple/hessian2/go-server/conf/dubbogo.yml
func main() {
    if err := config.Load(); err != nil {
        panic(err)
    }
    initSignal()
}

func initSignal() {
    signals := make(chan os.Signal, 1)
    // It is not possible to block SIGKILL or syscall.SIGSTOP
    signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
    for {
        sig := <-signals
        logger.Infof("get signal %s", sig.String())
        switch sig {
        case syscall.SIGHUP:
            // reload()
        default:
            time.Sleep(time.Second * 5)
            time.AfterFunc(time.Duration(survivalTimeout), func() {
                logger.Warnf("app exit now by force...")
                os.Exit(1)
            })

            // The program exits normally or timeout forcibly exits.
            fmt.Println("provider app exit now...")
            return
        }
    }
}
  • 客户端代码

package main

import (
    "context"
)

import (
    "dubbo.apache.org/dubbo-go/v3/common/logger"
    "dubbo.apache.org/dubbo-go/v3/config"
    _ "dubbo.apache.org/dubbo-go/v3/imports"

    hessian "github.com/apache/dubbo-go-hessian2"

    tripleCommon "github.com/dubbogo/triple/pkg/common"
)

var errorResponseProvider = new(ErrorResponseProvider)

func init() {
    config.SetConsumerService(errorResponseProvider)
    hessian.RegisterPOJO(&User{})
}

// need to setup environment variable "CONF_CONSUMER_FILE_PATH" to "conf/client.yml" before run
func main() {
    if err := config.Load(); err != nil {
        panic(err)
    }
    testErrorService()
    testService()
}

func testErrorService() {
    if user, err := errorResponseProvider.GetUser(context.TODO(), &User{Name: "laurence"}); err != nil {
        logger.Infof("response result: %v, error = %s", user, err)
        logger.Infof("error details = %+v", err.(tripleCommon.TripleError).Stacks())
    }
}

func testService() {
    if user, err := errorResponseProvider.GetUserWithoutError(context.TODO(), &User{Name: "laurence"}); err != nil {
        logger.Infof("response result: %v, error = %s", user, err)
        logger.Infof("error details = %+v", err.(tripleCommon.TripleError).Stacks())
    }
}

自定义Filter

Dubbo中Filter 是由一组Filter组成的一个Filter链,采用责任链模式进行开发。Filter内部可以控制RPC调用的行为,如参数验证、IP限制、流量限制等等。今天来学习一下Dubbo go中的自定义Filter

运行服务端代码

# 1. 跳转到项目目录cd /Users/andy/tool/golang/dubbo-go-samples/filter/custom/go-server/cmd

# 2. 导出配置文件exportDUBBO_GO_CONFIG_PATH=/Users/andy/tool/golang/dubbo-go-samples/filter/custom/go-server/conf/dubbogo.yml

# 3. 启动服务
go run .

运行服务端代码

# 1. 跳转到项目目录cd /Users/andy/tool/golang/dubbo-go-samples/filter/custom/go-client/cmd

# 2. 导出配置文件exportDUBBO_GO_CONFIG_PATH=/Users/andy/tool/golang/dubbo-go-samples/filter/custom/go-client/conf/dubbogo.yml

# 3. 启动服务
go run .
  • 服务端日志

在这里插入图片描述

  • 客户端日志

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ThTp1iAw-1666626986335)(/Users/andy/Library/Application Support/typora-user-images/image-20221024233412665.png)]

核心代码

  • 服务端 Filter

package main

import (
    "context"
    "fmt"
)

import (
    "dubbo.apache.org/dubbo-go/v3/common/extension"
    "dubbo.apache.org/dubbo-go/v3/filter"
    "dubbo.apache.org/dubbo-go/v3/protocol"
)

func init() {
    extension.SetFilter("myServerFilter", NewMyServerFilter)
}

func NewMyServerFilter() filter.Filter {
    return &MyServerFilter{}
}

type MyServerFilter struct {
}

func (f *MyServerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
    fmt.Println("MyServerFilter Invoke is called, method Name = ", invocation.MethodName())
    fmt.Printf("request attachments = %s\n", invocation.Attachments())
    return invoker.Invoke(ctx, invocation)
}
func (f *MyServerFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, protocol protocol.Invocation) protocol.Result {
    fmt.Println("MyServerFilter OnResponse is called")
    myAttachmentMap := make(map[string]interface{})
    myAttachmentMap["key1"] = "value1"
    myAttachmentMap["key2"] = []string{"value1", "value2"}
    result.SetAttachments(myAttachmentMap)
    return result
}
  • 服务端 main

package main

import (
    "context"
)

import (
    "dubbo.apache.org/dubbo-go/v3/common/logger"
    "dubbo.apache.org/dubbo-go/v3/config"
    _ "dubbo.apache.org/dubbo-go/v3/imports"
)

import (
    "github.com/apache/dubbo-go-samples/api"
)

type GreeterProvider struct {
    api.UnimplementedGreeterServer
}

func (s *GreeterProvider) SayHello(ctx context.Context, in *api.HelloRequest) (*api.User, error) {
    logger.Infof("Dubbo3 GreeterProvider get user name = %s\n", in.Name)
    return &api.User{Name: "Hello " + in.Name, Id: "12345", Age: 21}, nil
}

// export DUBBO_GO_CONFIG_PATH= PATH_TO_SAMPLES/helloworld/go-server/conf/dubbogo.yml
func main() {
    config.SetProviderService(&GreeterProvider{})
    if err := config.Load(); err != nil {
        panic(err)
    }
    select {}
}
  • 服务端 配置文件
dubbo:registries:demoZK:protocol: zookeeper
      address: 127.0.0.1:2181protocols:tripleProtocol:name: tri
      port:20000provider:services:GreeterProvider:filter: myServerFilter
        loadbalance: random
        warmup:100cluster: failover
  • 客户端 Filter

package main

import (
    "context"
    "fmt"
)

import (
    "dubbo.apache.org/dubbo-go/v3/common/extension"
    "dubbo.apache.org/dubbo-go/v3/filter"
    "dubbo.apache.org/dubbo-go/v3/protocol"
)

func init() {
    extension.SetFilter("myClientFilter", NewMyClientFilter)
}

func NewMyClientFilter() filter.Filter {
    return &MyClientFilter{}
}

type MyClientFilter struct {
}

func (f *MyClientFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
    fmt.Println("MyClientFilter Invoke is called, method Name = ", invocation.MethodName())
    invocation.SetAttachment("request-key1", "request-value1")
    invocation.SetAttachment("request-key2", []string{"request-value2.1", "request-value2.2"})
    return invoker.Invoke(ctx, invocation)
}
func (f *MyClientFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, protocol protocol.Invocation) protocol.Result {
    fmt.Println("MyClientFilter OnResponse is called")
    fmt.Println("result attachment = ", result.Attachments())
    return result
}
  • 客户端 main
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package main

import (
    "context"
)

import (
    "dubbo.apache.org/dubbo-go/v3/common/logger"
    "dubbo.apache.org/dubbo-go/v3/config"
    _ "dubbo.apache.org/dubbo-go/v3/imports"
)

import (
    "github.com/apache/dubbo-go-samples/api"
)

var userProvider = &api.GreeterClientImpl{}

func init() {
    config.SetConsumerService(userProvider)
}

func main() {
    err := config.Load()
    if err != nil {
        panic(err)
    }

    logger.Infof("\n\n\nstart to test")
    user, err := userProvider.SayHello(context.TODO(), &api.HelloRequest{Name: "laurence"})
    if err != nil {
        panic(err)
    }
    logger.Infof("get user = %+v", user)
}
  • 客户端 配置文件
# dubbo client yaml configure filedubbo:registries:demoZK:protocol: zookeeper
      timeout: 3s
      address: 127.0.0.1:2181consumer:filter: myClientFilter
    check:truerequest_timeout: 3s
    connect_timeout: 3s
    references:GreeterClientImpl:protocol: tri

服务端限流

运行服务端代码

# 1. 跳转到项目目录cd /Users/andy/tool/golang/dubbo-go-samples/filter/tpslimit/go-server/cmd

# 2. 导出配置文件exportDUBBO_GO_CONFIG_PATH=/Users/andy/tool/golang/dubbo-go-samples/filter/tpslimit/go-server/conf/dubbogo.yml

# 3. 启动服务
go run .

运行客户端代码

# 1. 跳转到项目目录cd /Users/andy/tool/golang/dubbo-go-samples/filter/tpslimit/go-client/cmd

# 2. 导出配置文件exportDUBBO_GO_CONFIG_PATH=/Users/andy/tool/golang/dubbo-go-samples/filter/tpslimit/go-client/conf/dubbogo.yml

# 3. 启动服务
go run .

在这里插入图片描述

核心代码

  • 客户端代码

package main

import (
    "context"
    "time"
)

import (
    "dubbo.apache.org/dubbo-go/v3/common/logger"
    "dubbo.apache.org/dubbo-go/v3/config"
    _ "dubbo.apache.org/dubbo-go/v3/imports"

    hessian "github.com/apache/dubbo-go-hessian2"
)

import (
    "github.com/apache/dubbo-go-samples/filter/tpslimit/go-client/pkg"
)

var userProvider = &pkg.UserProvider{}

func init() {
    config.SetConsumerService(userProvider)
    hessian.RegisterPOJO(&pkg.User{})
}

func main() {
    err := config.Load()
    if err != nil {
        panic(err)
    }

    var successCount, failCount int64
    logger.Infof("\n\n\nstart to test dubbo")
    for i := 0; i < 60; i++ {
        time.Sleep(200 * time.Millisecond)
        user, err := userProvider.GetUser(context.TODO(), "A001")
        if err != nil {
            failCount++
            logger.Infof("error: %v\n", err)
        } else {
            successCount++
        }
        logger.Infof("response: %v\n", user)
    }
    logger.Infof("successCount=%v, failCount=%v\n", successCount, failCount)
}
  • 服务端配置文件
# dubbo server yaml configure filedubbo:registries:demoZK:protocol: zookeeper
      timeout: 3s
      address: 127.0.0.1:2181protocols:dubbo:name: dubbo
      port:20000provider:services:UserProvider:interface: org.apache.dubbo.UserProvider
        cluster: failover
        loadbalance: random # load balancing strategy, such as random, roundrobin, leastactive or consistenthash.warmup:100# warmup period, in secondstps.limiter: method-service # the Limiter that judge if the TPS overs the threshold, such as method-service or defaulttps.limit.strategy: fixedWindow # the name of limit strategy, such as fixedWindow, slidingWindow, default, threadSafeFixedWindow or the strategy name you customedtps.limit.rejected.handler: DefaultValueHandler
        tps.limit.interval:1000# the interval time unit is mstps.limit.rate:3# the max value in the interval. <0 means that the service will not be limited.
  • 服务端代码

package main

import (
    "os"
    "os/signal"
    "syscall"
    "time"
)

import (
    "dubbo.apache.org/dubbo-go/v3/common/logger"
    "dubbo.apache.org/dubbo-go/v3/config"
    _ "dubbo.apache.org/dubbo-go/v3/filter/tps/strategy"
    _ "dubbo.apache.org/dubbo-go/v3/imports"

    hessian "github.com/apache/dubbo-go-hessian2"
)

import (
    "github.com/apache/dubbo-go-samples/filter/tpslimit/go-server/pkg"
)

var (
    survivalTimeout = int(3e9)
)

func main() {
    config.SetProviderService(new(pkg.UserProvider))
    hessian.RegisterPOJO(&pkg.User{})
    err := config.Load()
    if err != nil {
        panic(err)
    }
    initSignal()
}

func initSignal() {
    signals := make(chan os.Signal, 1)
    // It is not possible to block SIGKILL or syscall.SIGSTOP
    signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
    for {
        sig := <-signals
        logger.Infof("get signal %s", sig.String())
        switch sig {
        case syscall.SIGHUP:
            // reload()
        default:
            time.AfterFunc(time.Duration(survivalTimeout), func() {
                logger.Warnf("app exit now by force...")
                os.Exit(1)
            })

            // The program exits normally or timeout forcibly exits.
            logger.Infof("provider app exit now...")
            return
        }
    }
}
标签: 1024程序员节

本文转载自: https://blog.csdn.net/u013433591/article/details/127505129
版权归原作者 u013433591 所有, 如有侵权,请联系我们删除。

“Dubbo go 高级特性”的评论:

还没有评论