写在前面
最近稍微重构了之前写的 micro-todolist 模块
项目地址:https://github.com/CocaineCong/micro-todoList
本次升级将原有的micro v2升级到了micro v4版本,v5 still deving,所以可能不太稳定,所以选择了v4版本。
micro相对于grpc,区别就是
grpc比较原始
,什么都要自己封装,比如
服务的注册与发现
,
熔断降级
等等… 而micro这些都帮忙做好了,
只需要调用对于的方法函数就可以了
。
所以其实如果是对于rpc的学习,我还是推荐grpc这种比较原始的框架,可以更好的体验实现一些方法和函数。
1. 项目结构改变
与之前的目录有很大的区别,与先前的grpc的todolist的问题是一样的,
之前micro-todolist的目录
micro-todolist/
├── gateway // 网关
├── mq-server // mq消息队列
├── task // task 任务模块
└── user // user 用户莫OK
与之前v1版本的 grpc-todolist 一样,这种结构会有大量重复的代码!不利于代码的重复利用。
下面是代码结构的变化
1.micro_todolist 项目总体
micro-todolist/
├── app // 各个微服务
│ ├── gateway // 网关
│ ├── task // 任务模块微服务
│ └── user // 用户模块微服务
├── bin // 编译后的二进制文件模块
├── config // 配置文件
├── consts // 定义的常量
├── doc // 接口文档
├── idl // protoc文件
│ └── pb // 放置生成的pb文件
├── logs // 放置打印日志模块
├── pkg // 各种包
│ ├── ctl // 用户操作
│ ├── e // 统一错误状态码
│ ├── logger // 日志
│ └── util // 各种工具、JWT等等..
└── types // 定义各种结构体
2.gateway 网关部分
gateway/
├── cmd // 启动入口
├── http // HTTP请求头
├── handler // 视图层
├── logs // 放置打印日志模块
├── middleware // 中间件
├── router // http 路由模块
├── rpc // rpc 调用
└── wrappers // 熔断
注意一点:网关这边http请求就是我们网关的路由,而rpc就是定义的rpc请求,就是对各个微服务的调用。
3.user && task 用户与任务模块
task/
├── cmd // 启动入口
├── service // 业务服务
├── repository // 持久层
│ ├── db // 视图层
│ │ ├── dao // 对数据库进行操作
│ │ └── model // 定义数据库的模型
│ └── mq // 放置 mq
├── script // 监听 mq 的脚本
└── service // 服务
这里是用task模块作为例子,task模块中的用到了mq,所以就多一个对mq监听的脚本,来进行对mq的内容进行消费。
2. 代码层面的改变
2.1 rpc的请求
2.1.1 v1 改变前
之前的发起请求,我们都是网关启动的时候,将微服务注册到中间件中,然后再将取这个key对应的中间件。
- 将微服务注册到中间件中
// 接受服务实例,并存到gin.Key中funcInitMiddleware(service []interface{}) gin.HandlerFunc {returnfunc(context *gin.Context){// 将实例存在gin.Keys中
context.Keys =make(map[string]interface{})
context.Keys["userService"]= service[0]
context.Keys["taskService"]= service[1]
context.Next()}}
- 从context中读取这个 key 的service,然后再调用rpc函数
funcGetTaskList(ginCtx *gin.Context){var taskReq services.TaskRequest
PanicIfTaskError(ginCtx.Bind(&taskReq))
taskService := ginCtx.Keys["taskService"].(services.TaskService)
claim,_:= utils.ParseToken(ginCtx.GetHeader("Authorization"))// 拿到的是当前访问的用户的id,拿到用户自己的备忘录信息
taskReq.Uid =uint64(claim.Id)// 调用服务端的函数
taskResp, err := taskService.GetTasksList(context.Background(),&taskReq)if err !=nil{PanicIfTaskError(err)}
ginCtx.JSON(200, gin.H{"data": gin.H{"task": taskResp.TaskList,"count": taskResp.Count,},})}
这种写法问题很大!
- 当我们某个子微服务更新业务的时候,网关还是读取一开始注册
context中的旧的子微服务
,这导致,网关必须重新启动才能读取子微服务的更新!这样肯定是不行的!! - 鉴权应该放在中间件middleware中做!
2.1.2 v2 改变后
我们先对参数进行绑定
var taskReq pb.TaskRequest
if err := ctx.Bind(&taskReq); err !=nil{
ctx.JSON(http.StatusBadRequest, ctl.RespError(ctx, err,"绑定参数失败"))return}
然后再从 context 中获取用户信息,原因是,我们一开始在middleware中注册进去了用户信息,具体看后面代码。
user, err := ctl.GetUserInfo(ctx.Request.Context())if err !=nil{
ctx.JSON(http.StatusInternalServerError, ctl.RespError(ctx, err,"获取用户信息错误"))return}
再从rpc包中调用对于的rpc函数。
taskResp, err := rpc.TaskList(ctx,&taskReq)if err !=nil{
ctx.JSON(http.StatusInternalServerError, ctl.RespError(ctx, err,"taskResp RPC 调用失败"))return}
完整代码:
funcListTaskHandler(ctx *gin.Context){var taskReq pb.TaskRequest
if err := ctx.Bind(&taskReq); err !=nil{
ctx.JSON(http.StatusBadRequest, ctl.RespError(ctx, err,"绑定参数失败"))return}
user, err := ctl.GetUserInfo(ctx.Request.Context())if err !=nil{
ctx.JSON(http.StatusInternalServerError, ctl.RespError(ctx, err,"获取用户信息错误"))return}
taskReq.Uid =uint64(user.Id)// 调用服务端的函数
taskResp, err := rpc.TaskList(ctx,&taskReq)if err !=nil{
ctx.JSON(http.StatusInternalServerError, ctl.RespError(ctx, err,"taskResp RPC 调用失败"))return}
ctx.JSON(http.StatusOK, ctl.RespSuccess(ctx, taskResp))}
2.2 rabbitmq的监听
我们设定 task 创建的时候,task 先把数据送到 mq ,然后再从 mq 落库消费。
service 编写
CreateTask 创建备忘录,将备忘录信息生产,放到rabbitMQ消息队列中
func(t *TaskSrv)CreateTask(ctx context.Context, req *pb.TaskRequest, resp *pb.TaskDetailResponse)(err error){
body,_:= json.Marshal(req)// title,content
resp.Code = e.SUCCESS
err = mq.SendMessage2MQ(body)if err !=nil{
resp.Code = e.ERROR
return}return}
将消息发到mq
// SendMessage2MQ 发送消息到mqfuncSendMessage2MQ(body []byte)(err error){
ch, err := RabbitMq.Channel()if err !=nil{return}
q,_:= ch.QueueDeclare(consts.RabbitMqTaskQueue,true,false,false,false,nil)
err = ch.Publish("", q.Name,false,false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType:"application/json",
Body: body,})if err !=nil{return}
fmt.Println("发送MQ成功...")return}
我们在服务启动的时候进行监听消费
funcloadingScript(){
ctx := context.Background()go script.TaskCreateSync(ctx)}
mq 监听进行消费落库。
type SyncTask struct{}func(s *SyncTask)RunTaskCreate(ctx context.Context)error{
rabbitMqQueue := consts.RabbitMqTaskQueue
msgs, err := mq.ConsumeMessage(ctx, rabbitMqQueue)if err !=nil{return err
}var forever chanstruct{}gofunc(){for d :=range msgs {
log.LogrusObj.Infof("Received run Task: %s", d.Body)// 落库
reqRabbitMQ :=new(pb.TaskRequest)
err = json.Unmarshal(d.Body, reqRabbitMQ)if err !=nil{
log.LogrusObj.Infof("Received run Task: %s", err)}
err = service.TaskMQ2MySQL(ctx, reqRabbitMQ)// 具体落库操作if err !=nil{
log.LogrusObj.Infof("Received run Task: %s", err)}}}()
log.LogrusObj.Infoln(err)<-forever
returnnil}
3. 坑点
3.1 protoc-gen-micro
原来这个protoc-gen-micro工具是在 github.com/micro/protoc-gen-micro 这里的,后来变了,移动了位置。
现在到了github.com/go-micro/generator/cmd/protoc-gen-micro这个地方。
所以大家下载protoc-gen-micro的时候,就在新的地址下载就好啦。
go get -u github.com/go-micro/generator/cmd/protoc-gen-micro@latest
3.2 resp 的返回
当我们在实现micro的rpc服务接口的时候,我们会有这种情况
func(t *TaskSrv)GetTask(ctx context.Context, req *pb.TaskRequest, resp *pb.TaskDetailResponse)(err error){}
req 和 resp 都是传参进来的,对于resp我们就不需要再new一次了,如果再new一次,会覆盖掉原来的resp。因为
这个传进来的resp已经在上层new过了
。
已经在 task.pb.micro.go 文件中 new 过了,所以我们
后续就不用再new
,否则就会出错
func(c *taskService)GetTask(ctx context.Context, in *TaskRequest, opts ...client.CallOption)(*TaskDetailResponse,error){
req := c.c.NewRequest(c.name,"TaskService.GetTask", in)
out :=new(TaskDetailResponse)
err := c.c.Call(ctx, req, out, opts...)if err !=nil{returnnil, err
}return out,nil}
版权归原作者 小生凡一 所有, 如有侵权,请联系我们删除。