0


goframe开发一个企业网站 rabbitmq队例15

RabbitMQ消息队列封装

在目录internal/pkg/rabbitmq/rabbitmq.go

  1. # 消息队列配置
  2. mq:
  3. # 消息队列类型: rocketmq 或 rabbitmq
  4. type:"rabbitmq"
  5. # 是否启用消息队列
  6. enabled:true
  7. rocketmq:
  8. nameServer:"127.0.0.1:9876"
  9. producerGroup:"myProducerGroup"
  10. consumerGroup:"myConsumerGroup"
  11. brokerAddress:"127.0.0.1:10911" # 添加 broker 地址
  12. rabbitmq:
  13. url:"amqp://wanghaibin:wanghaibin@127.0.0.1:5672/"
  14. exchange:"gf_exchange"
  15. dlx_exchange:"gf_dlx_exchange" # 新增:死信交换机
  16. queue:"gf_queue"
  17. delay_queue:"gf_delay_queue" # 新增:延迟队列
  18. routingKey:"gf_key"
  19. vhost:"/"
  1. package rabbitmq
  2. import("context""fmt""time""github.com/gogf/gf/v2/frame/g"
  3. amqp "github.com/rabbitmq/amqp091-go")var(// conn RabbitMQ连接实例
  4. conn *amqp.Connection
  5. // channel RabbitMQ通道实例
  6. channel *amqp.Channel
  7. )// Initialize 初始化 RabbitMQ 连接和通道// 包括:建立连接、创建通道、声明交换机和队列、建立绑定关系funcInitialize(){var err error
  8. ctx := context.Background()// 从配置文件获取RabbitMQ连接URL
  9. url := g.Cfg().MustGet(ctx,"rabbitmq.url").String()// 建立RabbitMQ连接
  10. conn, err = amqp.Dial(url)if err !=nil{
  11. g.Log().Fatalf(ctx,"Failed to connect to RabbitMQ: %v", err)}// 创建通道
  12. channel, err = conn.Channel()if err !=nil{
  13. g.Log().Fatalf(ctx,"Failed to open channel: %v", err)}// 1. 声明主交换机// 类型:direct,持久化:true,自动删除:false,内部的:false,非阻塞:false
  14. err = channel.ExchangeDeclare(
  15. g.Cfg().MustGet(ctx,"rabbitmq.exchange").String(),"direct",// 交换机类型true,// 持久化false,// 自动删除false,// 内部的false,// 非阻塞nil,// 参数)if err !=nil{
  16. g.Log().Fatalf(ctx,"Failed to declare main exchange: %v", err)}// 2. 声明死信交换机(DLX)// 用于处理无法被正常消费的消息
  17. err = channel.ExchangeDeclare(
  18. g.Cfg().MustGet(ctx,"rabbitmq.dlx_exchange").String(),"direct",true,false,false,false,nil,)if err !=nil{
  19. g.Log().Fatalf(ctx,"Failed to declare DLX exchange: %v", err)}// 3. 声明主队列// 持久化:true,非自动删除,非排他,非阻塞_, err = channel.QueueDeclare(
  20. g.Cfg().MustGet(ctx,"rabbitmq.queue").String(),true,// 持久化false,// 自动删除false,// 排他的false,// 非阻塞nil,// 参数)if err !=nil{
  21. g.Log().Fatalf(ctx,"Failed to declare main queue: %v", err)}// 4. 声明延迟队列// 配置死信交换机参数
  22. args := amqp.Table{"x-dead-letter-exchange": g.Cfg().MustGet(ctx,"rabbitmq.dlx_exchange").String(),"x-dead-letter-routing-key": g.Cfg().MustGet(ctx,"rabbitmq.routingKey").String(),}_, err = channel.QueueDeclare(
  23. g.Cfg().MustGet(ctx,"rabbitmq.delay_queue").String(),true,false,false,false,
  24. args,)if err !=nil{
  25. g.Log().Fatalf(ctx,"Failed to declare delay queue: %v", err)}// 5. 绑定主队列到主交换机
  26. err = channel.QueueBind(
  27. g.Cfg().MustGet(ctx,"rabbitmq.queue").String(),// 队列名
  28. g.Cfg().MustGet(ctx,"rabbitmq.routingKey").String(),// 路由键
  29. g.Cfg().MustGet(ctx,"rabbitmq.exchange").String(),// 交换机名false,nil,)if err !=nil{
  30. g.Log().Fatalf(ctx,"Failed to bind main queue: %v", err)}// 6. 绑定主队列到死信交换机
  31. err = channel.QueueBind(
  32. g.Cfg().MustGet(ctx,"rabbitmq.queue").String(),
  33. g.Cfg().MustGet(ctx,"rabbitmq.routingKey").String(),
  34. g.Cfg().MustGet(ctx,"rabbitmq.dlx_exchange").String(),false,nil,)if err !=nil{
  35. g.Log().Fatalf(ctx,"Failed to bind queue to DLX: %v", err)}
  36. g.Log().Info(ctx,"RabbitMQ initialized successfully")}// PublishMessage 发布消息到RabbitMQ// 参数:// - ctx: 上下文// - message: 要发送的消息内容// 返回:// - error: 发送错误,如果成功则为nilfuncPublishMessage(ctx context.Context, message string)error{// 创建带超时的上下文
  37. ctxTimeout, cancel := context.WithTimeout(ctx,5*time.Second)defercancel()// 发布消息到指定的交换机和路由
  38. err := channel.PublishWithContext(ctxTimeout,
  39. g.Cfg().MustGet(ctx,"rabbitmq.exchange").String(),
  40. g.Cfg().MustGet(ctx,"rabbitmq.routingKey").String(),false,// mandatoryfalse,// immediate
  41. amqp.Publishing{
  42. ContentType:"text/plain",
  43. Body:[]byte(message),},)if err !=nil{return fmt.Errorf("failed to publish message: %v", err)}returnnil}// ConsumeMessages 消费队列中的消息// 参数:// - ctx: 上下文// - handler: 消息处理函数// 返回:// - error: 消费错误,如果成功则为nilfuncConsumeMessages(ctx context.Context, handler func(string)error)error{
  44. messages, err := channel.Consume(
  45. g.Cfg().MustGet(ctx,"rabbitmq.queue").String(),"",// consumerfalse,// auto-ackfalse,// exclusivefalse,// no-localfalse,// no-waitnil,// args)if err !=nil{return fmt.Errorf("failed to register a consumer: %v", err)}// 启动goroutine处理消息gofunc(){for msg :=range messages {
  46. err :=handler(string(msg.Body))if err !=nil{
  47. g.Log().Errorf(ctx,"Error handling message: %v", err)
  48. msg.Nack(false,true)// 处理失败,消息重新入队}else{
  49. msg.Ack(false)// 处理成功,确认消息}}}()returnnil}// Cleanup 清理RabbitMQ连接和通道funcCleanup(){if channel !=nil{
  50. channel.Close()}if conn !=nil{
  51. conn.Close()}}// GetChannel 获取RabbitMQ通道实例funcGetChannel()*amqp.Channel {return channel
  52. }// PurgeQueue 清空指定队列中的所有消息// 参数:// - ctx: 上下文// 返回:// - error: 清空错误,如果成功则为nilfuncPurgeQueue(ctx context.Context)error{_, err := channel.QueuePurge(
  53. g.Cfg().MustGet(ctx,"rabbitmq.queue").String(),false,// no-wait)return err
  54. }// PublishDelayMessage 发送延迟消息// 参数:// - ctx: 上下文// - message: 消息内容// - delaySeconds: 延迟秒数// 返回:// - error: 发送错误,如果成功则为nilfuncPublishDelayMessage(ctx context.Context, message string, delaySeconds int)error{return channel.PublishWithContext(ctx,"",// 默认交换机
  55. g.Cfg().MustGet(ctx,"rabbitmq.delay_queue").String(),// 延迟队列false,false,
  56. amqp.Publishing{
  57. ContentType:"text/plain",
  58. Body:[]byte(message),
  59. Expiration: fmt.Sprintf("%d", delaySeconds*1000),// 转换为毫秒},)}// GetQueueLength 获取队列中的消息数量// 参数:// - ctx: 上下文// 返回:// - int: 消息数量// - error: 获取错误,如果成功则为nilfuncGetQueueLength(ctx context.Context)(int,error){
  60. queue, err := channel.QueueInspect(
  61. g.Cfg().MustGet(ctx,"rabbitmq.queue").String(),)if err !=nil{return0, fmt.Errorf("failed to inspect queue: %v", err)}return queue.Messages,nil}
logic逻辑的实现
  1. package rabbitmqmsg
  2. import("context""fmt""gf_new_web/internal/pkg/rabbitmq""gf_new_web/internal/service""github.com/gogf/gf/v2/frame/g")// sRabbitmqMsg RabbitMQ消息服务结构体type sRabbitmqMsg struct{}// New 创建新的RabbitMQ消息服务实例funcNew()*sRabbitmqMsg {return&sRabbitmqMsg{}}// init 初始化函数,在包加载时自动注册RabbitMQ消息服务funcinit(){
  3. service.RegisterRabbitmqMsg(New())}// SendMessage 发送普通消息到RabbitMQ// 参数:// - ctx: 上下文信息// - message: 要发送的消息内容// 返回:// - error: 发送错误,成功则为nilfunc(s *sRabbitmqMsg)SendMessage(ctx context.Context, message string)error{return rabbitmq.PublishMessage(ctx, message)}// SendDelayMessage 发送延迟消息到RabbitMQ// 参数:// - ctx: 上下文信息// - message: 要发送的消息内容// - delaySeconds: 延迟时间(秒)// 返回:// - error: 发送错误,成功则为nilfunc(s *sRabbitmqMsg)SendDelayMessage(ctx context.Context, message string, delaySeconds int)error{return rabbitmq.PublishDelayMessage(ctx, message, delaySeconds)}// SendBatchMessages 批量发送消息到RabbitMQ// 参数:// - ctx: 上下文信息// - messages: 消息内容数组// 返回:// - error: 发送错误,成功则为nil// 注意:任一消息发送失败都会导致整个批次失败func(s *sRabbitmqMsg)SendBatchMessages(ctx context.Context, messages []string)error{for_, msg :=range messages {if err := rabbitmq.PublishMessage(ctx, msg); err !=nil{return err
  4. }}returnnil}// GetQueueLength 获取队列当前的消息数量// 参数:// - ctx: 上下文信息// 返回:// - int: 队列中的消息数量// - error: 获取错误,成功则为nilfunc(s *sRabbitmqMsg)GetQueueLength(ctx context.Context)(int,error){
  5. queue, err := rabbitmq.GetChannel().QueueInspect(
  6. g.Cfg().MustGet(ctx,"rabbitmq.queue").String(),)if err !=nil{return0, fmt.Errorf("failed to inspect queue: %v", err)}return queue.Messages,nil}// PurgeQueue 清空队列中的所有消息// 参数:// - ctx: 上下文信息// 返回:// - error: 清空错误,成功则为nilfunc(s *sRabbitmqMsg)PurgeQueue(ctx context.Context)error{return rabbitmq.PurgeQueue(ctx)}// handleMessage 处理接收到的单条消息// 参数:// - message: 消息内容// 返回:// - error: 处理错误,成功则为nil// 注意:这是内部方法,实现具体的消息处理逻辑func(s *sRabbitmqMsg)handleMessage(message string)error{// 记录接收到的消息
  7. g.Log().Info(context.Background(),"收到消息:", message)// TODO: 在这里添加实际的消息处理逻辑returnnil}// Initialize 初始化消息消费处理// 参数:// - ctx: 上下文信息// 返回:// - error: 初始化错误,成功则为nil// 功能:启动消息消费者,并设置消息处理函数func(s *sRabbitmqMsg)Initialize(ctx context.Context)error{return rabbitmq.ConsumeMessages(ctx,func(msg string)error{return s.handleMessage(msg)})}

生成service ,不再写上

controller代码

  1. package front
  2. import("fmt""gf_new_web/internal/service""time""github.com/gogf/gf/v2/frame/g""github.com/gogf/gf/v2/net/ghttp")var(
  3. RabbitMsg = cRabbitMsg{})type cRabbitMsg struct{}// SendMessage 处理发送普通消息的HTTP请求// 请求参数:// - message: 消息内容// 响应格式:// 成功:{"code": 0, "msg": "消息发送成功"}// 失败:{"code": -1, "msg": "错误信息"}func(c *cRabbitMsg)SendMessage(r *ghttp.Request){
  4. message := r.Get("message").String()
  5. err := service.RabbitmqMsg().SendMessage(r.GetCtx(), message)if err !=nil{
  6. g.Log().Error(r.GetCtx(), err)
  7. r.Response.WriteJson(g.Map{"code":-1,"msg": err.Error(),})return}
  8. r.Response.WriteJson(g.Map{"code":0,"msg":"消息发送成功",})}// SendDelayMessage 处理发送延迟消息的HTTP请求// 请求参数:// - message: 消息内容// - delay: 延迟时间(秒)// 响应格式:// 成功:{"code": 0, "msg": "延迟消息发送成功"}// 失败:{"code": -1, "msg": "错误信息"}func(c *cRabbitMsg)SendDelayMessage(r *ghttp.Request){
  9. message := r.Get("message").String()
  10. delaySeconds := r.Get("delay").Int()
  11. err := service.RabbitmqMsg().SendDelayMessage(r.GetCtx(), message, delaySeconds)if err !=nil{
  12. g.Log().Error(r.GetCtx(), err)
  13. r.Response.WriteJson(g.Map{"code":-1,"msg": err.Error(),})return}
  14. r.Response.WriteJson(g.Map{"code":0,"msg":"延迟消息发送成功",})}// SendBatchMessages 处理批量发送消息的HTTP请求// 请求参数:// - messages: 消息内容数组// 响应格式:// 成功:{"code": 0, "msg": "批量消息发送成功"}// 失败:{"code": -1, "msg": "错误信息"}func(c *cRabbitMsg)SendBatchMessages(r *ghttp.Request){
  15. messages := r.Get("messages").Strings()
  16. err := service.RabbitmqMsg().SendBatchMessages(r.GetCtx(), messages)if err !=nil{
  17. g.Log().Error(r.GetCtx(), err)
  18. r.Response.WriteJson(g.Map{"code":-1,"msg": err.Error(),})return}
  19. r.Response.WriteJson(g.Map{"code":0,"msg":"批量消息发送成功",})}// GetQueueLength 处理获取队列长度的HTTP请求// 响应格式:// 成功:{"code": 0, "msg": "获取队列长度成功", "data": 队列长度}// 失败:{"code": -1, "msg": "错误信息"}func(c *cRabbitMsg)GetQueueLength(r *ghttp.Request){
  20. length, err := service.RabbitmqMsg().GetQueueLength(r.GetCtx())if err !=nil{
  21. g.Log().Error(r.GetCtx(), err)
  22. r.Response.WriteJson(g.Map{"code":-1,"msg": err.Error(),})return}
  23. r.Response.WriteJson(g.Map{"code":0,"msg":"获取队列长度成功","data": length,})}// PurgeQueue 处理清空队列的HTTP请求// 响应格式:// 成功:{"code": 0, "msg": "清空队列成功"}// 失败:{"code": -1, "msg": "错误信息"}func(c *cRabbitMsg)PurgeQueue(r *ghttp.Request){
  24. err := service.RabbitmqMsg().PurgeQueue(r.GetCtx())if err !=nil{
  25. g.Log().Error(r.GetCtx(), err)
  26. r.Response.WriteJson(g.Map{"code":-1,"msg": err.Error(),})return}
  27. r.Response.WriteJson(g.Map{"code":0,"msg":"清空队列成功",})}// ConsumeMessages 处理消费消息的HTTP请求// 特点:异步处理,非阻塞// 响应格式:// 成功:{"code": 0, "msg": "消息消费已开始,请查看服务器日志获取消费详情"}// 失败:{"code": -1, "msg": "错误信息"}func(c *cRabbitMsg)ConsumeMessages(r *ghttp.Request){
  28. g.Log().Info(r.GetCtx(),"开始消费消息...")
  29. done :=make(chanbool)gofunc(){
  30. err := service.RabbitmqMsg().Initialize(r.GetCtx())if err !=nil{
  31. g.Log().Error(r.GetCtx(),"消费消息出错:", err)
  32. r.Response.WriteJson(g.Map{"code":-1,"msg": fmt.Sprintf("消费消息失败: %v", err),})
  33. done <-truereturn}}()select{case<-done:returncase<-time.After(5* time.Second):
  34. g.Log().Info(r.GetCtx(),"消息消费进行中...")
  35. r.Response.WriteJson(g.Map{"code":0,"msg":"消息消费已开始,请查看服务器日志获取消费详情",})}}
标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/hzether/article/details/143663580
版权归原作者 老大白菜 所有, 如有侵权,请联系我们删除。

“goframe开发一个企业网站 rabbitmq队例15”的评论:

还没有评论