0


MAC上RabbitMQ从安装到用GO快速实现

文章目录

前言

最近在跟慕课做一个秒杀商城的小项目,接触了RabbitMQ

虽然平时是在Python中实现消息队列,但是不得不说RabbitMQ香呀

今天也是除夕,在这个祝大家新年快乐,发个小水文章吧QAQ

安装开始

# 基础安装
$ brew install rabbitmq
$ vim ~/.zshrc # 将 export PATH=$PATH:/usr/local/sbin 写入
$ rabbitmq-server # 重新打开终端,开启服务# 开启插件
$ rabbitmq-plugins list # 查看插件
$ rabbitmq-plugins enable rabbitmq_management # 启动管理插件
$ rabbitmq-plugins enable rabbitmq_tracing # 启动日志
$ rabbitmq-plugins disable rabbitmq_tracing # 关闭日志# 额外命令
$ rabbitmq-server -detached # 后台启动
$ rabbitmqctl status # 查看状态
$ rabbitmqctl stop # 关闭

打好基础

这些都是很基本的概念,你得明白什么是什么就好了

因为作为工具,首先要会用起来,会采用囫囵吞枣的模式学习

随着后面的深入,慢慢了解特点吧

概念描述Channel生产者publish或是消费者subscribe一个队列都是通过信道来通信的Exchangeexchange的作用就是类似路由器,服务器会根据路由键将消息从交换器路由到队列上去Queue队列收到的消息将发送给消费者Binding建立链接交换的绑定信息VirtualHost不同的隔离区,防止污染Connection建立的链接

工作模式描述simple最简单的收发模式work资源的竞争publish/subscribe共享资源routing只能匹配上路由key对应的消息队列,对应的消费者才能消费消息topicrouting的一种模糊匹配

代码实现

功能仓库文件

// MQURL 连接信息 amqp://账号:密码@ip:host/vhostconst MQURL ="amqp://guest:[email protected]:5672/"// RabbitMQ rabbitMQ结构体type RabbitMQ struct{
    conn      *amqp.Connection // 链接
    channel   *amqp.Channel    // 通道
    QueueName string//队列名称
    Exchange  string//交换机名称
    Key       string//bind Key 名称
    Mqurl     string//连接信息}// NewRabbitMQ 创建结构体实例funcNewRabbitMQ(queueName string, exchange string, key string)*RabbitMQ {return&RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}}// Destroy 断开 channel 和 connectionfunc(r *RabbitMQ)Destroy(){
    r.channel.Close()// 断开 channel
    r.conn.Close()// 断开 conn}// 错误处理函数func(r *RabbitMQ)failOnErr(err error, message string){if err !=nil{
        log.Printf("%s:%s", message, err)// 打印错误panic(fmt.Sprintf("%s:%s", message, err))// 抛出错误}}// NewRabbitMQSimple 创建简单模式下RabbitMQ实例// 在Simple模式下唯一不同的是 queueNamefuncNewRabbitMQSimple(queueName string)*RabbitMQ {// todo 创建RabbitMQ实例
    rabbitmq :=NewRabbitMQ(queueName,"","")var err error// todo 补上conn与channel//获取connection
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//获取channel
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err,"failed to open a channel")return rabbitmq
}// PublishSimple 简单模式下队列生产func(r *RabbitMQ)PublishSimple(message string){// todo 申请队列,如果队列不存在会自动创建,存在则跳过创建_, err := r.channel.QueueDeclare(
        r.QueueName,// 首先放入名称false,//是否持久化false,//是否自动删除false,//是否具有排他性false,//是否阻塞处理nil,//额外的属性)if err !=nil{
        fmt.Println(err)}//todo 调用channel 发送消息到队列中
    r.channel.Publish(
        r.Exchange,// 此处为空
        r.QueueName,false,//如果为true,根据自身exchange类型和routeKey规则;无法找到符合条件的队列会把消息返还给发送者false,//如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
        amqp.Publishing{
            ContentType:"text/plain",
            Body:[]byte(message),})}// ConsumeSimple 简单模式下消费者func(r *RabbitMQ)ConsumeSimple(){//todo 申请队列,如果队列不存在会自动创建,存在则跳过创建
    q, err := r.channel.QueueDeclare(
        r.QueueName,false,//是否持久化false,//是否自动删除false,//是否具有排他性false,//是否阻塞处理nil,//额外的属性)if err !=nil{
        fmt.Println(err)}//todo 接收消息
    msg, err := r.channel.Consume(
        q.Name,// queue"",//用来区分多个消费者 此处不区分true,//是否自动应答false,//是否独有false,//设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者false,// 是否阻塞处理nil,// 额外的属性)if err !=nil{
        fmt.Println(err)}//todo 启用协程处理消息// 此处使用forever的意思为因为协程会始终监听消息(除非手动结束)// 手动结束才会进行 <-forever 有协程且一直尝试读取数据
    forever :=make(chanbool)gofunc(){for d :=range msg {// 消息逻辑处理
            log.Printf("Received a message: %s", d.Body)}}()
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
}// NewRabbitMQPubSub 订阅模式创建RabbitMQ实例就要设置路由器了funcNewRabbitMQPubSub(exchangeName string)*RabbitMQ {//todo 创建RabbitMQ实例
    rabbitmq :=NewRabbitMQ("", exchangeName,"")var err error//todo 获取connection和获取channel
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err,"failed to connect rabbitmq!")
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err,"failed to open a channel")return rabbitmq
}// PublishPub 订阅模式生产func(r *RabbitMQ)PublishPub(message string){//todo 尝试创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,"fanout",true,false,false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,nil,)

    r.failOnErr(err,"Failed to declare an exchange")//todo 发送消息
    err = r.channel.Publish(
        r.Exchange,"",false,false,
        amqp.Publishing{
            ContentType:"text/plain",
            Body:[]byte(message),})}// ReceiveSub 订阅模式消费端代码func(r *RabbitMQ)ReceiveSub(){//todo 试探性创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,"fanout",//交换机类型true,false,false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,nil,)
    r.failOnErr(err,"Failed to declare an exchange")//todo 试探性创建队列,这里注意队列名称不要写
    q, err := r.channel.QueueDeclare("",//随机生产队列名称false,false,true,false,nil,)
    r.failOnErr(err,"Failed to declare a queue")//todo 绑定队列到 exchange 中
    err = r.channel.QueueBind(
        q.Name,"",//在pub/sub模式下,这里的key要为空
        r.Exchange,false,nil)//todo 消费消息
    message, err := r.channel.Consume(
        q.Name,"",true,false,false,false,nil,)
    forever :=make(chanbool)gofunc(){for d :=range message {
            log.Printf("Received a message: %s", d.Body)}}()
    fmt.Println("退出请按 CTRL+C\n")<-forever
}// NewRabbitMQRouting 路由模式funcNewRabbitMQRouting(exchangeName string, routingKey string)*RabbitMQ {//todo 创建RabbitMQ实例
    rabbitmq :=NewRabbitMQ("", exchangeName, routingKey)var err error//todo 获取connection 获取channel
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err,"failed to connect rabbitmq!")
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err,"failed to open a channel")return rabbitmq
}// PublishRouting 路由模式发送消息func(r *RabbitMQ)PublishRouting(message string){//1.尝试创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,"direct",true,false,false,false,nil,)

    r.failOnErr(err,"Failed to declare an exchange")//2.发送消息
    err = r.channel.Publish(
        r.Exchange,//要设置
        r.Key,false,false,
        amqp.Publishing{
            ContentType:"text/plain",
            Body:[]byte(message),})}// ReceiveRouting 路由模式接受消息func(r *RabbitMQ)ReceiveRouting(){// todo 试探性创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,//交换机类型"direct",true,false,false,false,nil,)
    r.failOnErr(err,"Failed to declare an exchange")// todo 试探性创建队列,这里注意队列名称不要写
    q, err := r.channel.QueueDeclare("",//随机生产队列名称false,false,true,false,nil,)
    r.failOnErr(err,"Failed to declare a queue")//绑定队列到 exchange 中
    err = r.channel.QueueBind(
        q.Name,//需要绑定key
        r.Key,
        r.Exchange,false,nil)// todo 消费消息
    message, err := r.channel.Consume(
        q.Name,"",true,false,false,false,nil,)
    forever :=make(chanbool)gofunc(){for d :=range message {
            log.Printf("Received a message: %s", d.Body)}}()
    fmt.Println("退出请按 CTRL+C\n")<-forever
}// NewRabbitMQTopic 话题模式funcNewRabbitMQTopic(exchangeName string, routingKey string)*RabbitMQ {// todo 创建RabbitMQ实例
    rabbitmq :=NewRabbitMQ("", exchangeName, routingKey)var err error// todo 获取connection与获取channel
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err,"failed to connect rabbitmq!")
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err,"failed to open a channel")return rabbitmq
}// PublishTopic 话题模式发送消息func(r *RabbitMQ)PublishTopic(message string){// todo 尝试创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,"topic",true,false,false,false,nil,)

    r.failOnErr(err,"Failed to declare an exchange")// todo 发送消息
    err = r.channel.Publish(
        r.Exchange,//要设置
        r.Key,false,false,
        amqp.Publishing{
            ContentType:"text/plain",
            Body:[]byte(message),})}// ReceiveTopic 话题模式接受消息//要注意key,规则//其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)//匹配 xx.* 表示匹配 xx.hello, 但是 xx.hello.one需要用 xx.#才能匹配到func(r *RabbitMQ)ReceiveTopic(){//1.试探性创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,//交换机类型"topic",true,false,false,false,nil,)
    r.failOnErr(err,"Failed to declare an exch"+"ange")//2.试探性创建队列,这里注意队列名称不要写
    q, err := r.channel.QueueDeclare("",//随机生产队列名称false,false,true,false,nil,)
    r.failOnErr(err,"Failed to declare a queue")//绑定队列到 exchange 中
    err = r.channel.QueueBind(
        q.Name,//在pub/sub模式下,这里的key要为空
        r.Key,
        r.Exchange,false,nil)//消费消息
    message, err := r.channel.Consume(
        q.Name,"",true,false,false,false,nil,)

    forever :=make(chanbool)gofunc(){for d :=range message {
            log.Printf("Received a message: %s", d.Body)}}()

    fmt.Println("退出请按 CTRL+C\n")<-forever
}

各个模式实现

因为不确定包的位置

所以报红简单写一下引入上面的仓库文件就好了

  1. 简单模式
// 发布者funcmain(){// todo 创建实例并发送消息
    rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple")
    rabbitmq.PublishSimple("Hello world!")
    fmt.Println("发送成功!")}// 接受者funcmain(){
    rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple")// 名字要一样
    rabbitmq.ConsumeSimple()}
  1. 工作模式
// 发布者funcmain(){
    rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple")for i :=0; i <=100; i++{
        rabbitmq.PublishSimple("Hello world!"+ strconv.Itoa(i))
        time.Sleep(1* time.Second)
        fmt.Println(i)}}// 接受者1funcmain(){
    rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple")
    rabbitmq.ConsumeSimple()}// 接受者2funcmain(){
    rabbitmq := RabbitMQ.NewRabbitMQSimple("Simple")
    rabbitmq.ConsumeSimple()}
  1. 发布模式
// 发布者funcmain(){
    rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")for i :=0; i <100; i++{
        rabbitmq.PublishPub("订阅模式生产第"+ strconv.Itoa(i)+"条"+"数据")
        fmt.Println("订阅模式生产第"+ strconv.Itoa(i)+"条"+"数据")
        time.Sleep(1* time.Second)}}// 订阅者1funcmain(){
    rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
    rabbitmq.ReceiveSub()}// 订阅者2funcmain(){
    rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
    rabbitmq.ReceiveSub()}
  1. 路由模式
// 发布者funcmain(){
    One := RabbitMQ.NewRabbitMQRouting("ex","one")
    Two := RabbitMQ.NewRabbitMQRouting("ex","two")for i :=0; i <=10; i++{
        One.PublishRouting("Hello one!"+ strconv.Itoa(i))
        Two.PublishRouting("Hello Two!"+ strconv.Itoa(i))
        time.Sleep(1* time.Second)
        fmt.Println(i)}}// 接受者funcmain(){
    One := RabbitMQ.NewRabbitMQRouting("ex","one")
    One.ReceiveRouting()}// 接受者funcmain(){
    Two := RabbitMQ.NewRabbitMQRouting("ex","two")
    Two.ReceiveRouting()}
  1. 话题模式
// 发布者funcmain(){
    One := RabbitMQ.NewRabbitMQTopic("exTopic","topic.one")
    Two := RabbitMQ.NewRabbitMQTopic("exTopic","topic.two")for i :=0; i <=10; i++{
        One.PublishTopic("Hello topic one!"+ strconv.Itoa(i))
        Two.PublishTopic("Hello topic Two!"+ strconv.Itoa(i))
        time.Sleep(1* time.Second)
        fmt.Println(i)}}// 接受者1funcmain(){
    One := RabbitMQ.NewRabbitMQTopic("exTopic","#")// # 表示一个或者多个词语
    One.ReceiveTopic()}// 接受者2funcmain(){
    Two := RabbitMQ.NewRabbitMQTopic("exTopic","*.two")// 表示多个词语
    Two.ReceiveTopic()}
标签: rabbitmq macos golang

本文转载自: https://blog.csdn.net/weixin_51485807/article/details/122761910
版权归原作者 望向天空的恒毅 所有, 如有侵权,请联系我们删除。

“MAC上RabbitMQ从安装到用GO快速实现”的评论:

还没有评论