0


RabbitMQ 学习笔记

RabbitMQ学习笔记

一些概念

Broker

:RabbitMQ服务。

virtual host

: 其实就是分组。

Connection

:连接,生产者消费者与Broker之间的TCP连接。

Channel

:网络信道,轻量级的Connection,使用

Channel

可以减少

Connection

的建立,减少开销。

Message

:消息,由

Properties

Body

组成,

Properties

可以对消息的优先级、延迟等特性进行记录,

Body

存储消息体的内容。

Exchange

:交换机,没有消息存储功能,负责分发消息。

Binding

Exchange

Queue

之间的虚拟连接,其中可以包含

Routing Key

Routing Key

:路由规则,用于确定如何分发、接收消息。

Queue

:消息队列,保存消息并将其转发给消费者进行消费。

安装

Windows安装

安装erLang语言

进入官网

image-20220723085850289

下载完之后一直下一步安装即可,安装完成后进入目录,配置环境变量

image-20220723092150573

image-20220723092301127

安装RabbitMQ服务端

Release RabbitMQ 3.7.3 · rabbitmq/rabbitmq-server (github.com)

image-20220723091828280

一直下一步安装即可

安装完成后打开安装目录,进入到这个文件夹打开命令行

image-20220723093324568

输入命令安装插件

rabbitmq-plugins enable rabbitmq_management

完成后双击

rabbitmq-server.bat

打开http://localhost:15672/

用户名密码是

guest/guest

image-20220723093515104

image-20220723093550183

Linux下使用 Docker 安装

直接拉取最新版

docker pull rabbitmq

运行容器

docker run -d--name rabbitmq -p5671:5671 -p5672:5672 -p4369:4369 -p25672:25672 -p15671:15671 -p15672:15672 rabbitmq:management

进入容器

dockerexec-it rabbitmq /bin/bash

开启管理插件

rabbitmq-plugins enable rabbitmq_management

image-20220723103556298

打开管理网站 http://localhost:15672/

4369, 25672 (Erlang发现&集群端口)

5672, 5671 (AMQP端口)

15672 (web管理后台端口)

61613, 61614 (STOMP协议端口)

1883, 8883 (MQTT协议端口)

用户名密码均为

guest

image-20220723103a414689

实操

官网例子

简单模式

11111

配置文件

application-easy.yml

spring:rabbitmq:host: 123.123.123.123
    port:5672username: Gettler
    password:********virtual-host: /
    queue: easy-queue

生产者:

packagecom.gettler.rabbitmq.easy;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("easy")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassProducerTest{// 队列名称@Value("${spring.rabbitmq.queue}")publicStringQUEUE_NAME;privatestaticfinalLogger logger =LoggerFactory.getLogger(ProducerTest.class);@TestpublicvoidtestProducer()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();/*
          创建一个队列
          1.队列名称
          2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)
          3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)
          4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)
          5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message ="hello, this is an easy message";/*
          发送一个消息
          1.发送到那个交换机(空代表默认交换机)
          2.路由key
          3.其他的参数信息
          4.发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null, message.getBytes());
        logger.info("消息发送完毕");}}

消费者:

packagecom.gettler.rabbitmq.easy;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.DeliverCallback;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("easy")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassConsumerTest{// 队列名称@Value("${spring.rabbitmq.queue}")publicStringQUEUE_NAME;privatestaticfinalLogger logger =LoggerFactory.getLogger(ConsumerTest.class);@TestpublicvoidtestConsumer()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 消费消息的回调DeliverCallback deliverCallback =(consumerTag, message)->{
            logger.info("消费消息成功,消息内容为:"+newString(message.getBody()));};// 取消消费的回调CancelCallback cancelCallback =(consumerTag)->{
            logger.info("消息消费被中断");};/*
          消费者消费消息
          1.消费的队列名称
          2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)
          3.消费者消费消息的回调(函数式接口)
          4.消费者取消消费的回调(函数式接口)
         */
        channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);}}

image-20240620161232526

工作模式

在这里插入图片描述

配置文件

application-work.yml

spring:rabbitmq:host: 123.123.123.123
    port:5672username: Gettler
    password:********virtual-host: /
    queue: work-queue

生产者:

packagecom.gettler.rabbitmq.work;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;importjava.util.Scanner;@ActiveProfiles("work")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassProducerTest{// 队列名称@Value("${spring.rabbitmq.queue}")publicStringQUEUE_NAME;privatestaticfinalLogger logger =LoggerFactory.getLogger(ProducerTest.class);@Value("${spring.rabbitmq.host}")privateString host;@Value("${spring.rabbitmq.username}")privateString username;@Value("${spring.rabbitmq.password}")privateString password;@TestpublicvoidtestProducer()throwsException{System.out.println(this.host);// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare("fanout",BuiltinExchangeType.FANOUT);/*
          创建一个队列
          1.队列名称
          2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)
          3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)
          4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)
          5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();/*
              发送一个消息
              1.发送到那个交换机(空代表默认交换机)
              2.路由key
              3.其他的参数信息
              4.发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null, message.getBytes());
            logger.info("消息发送完毕");}}}

消费者A:

packagecom.gettler.rabbitmq.work;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.DeliverCallback;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("work")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassConsumerATest{// 队列名称@Value("${spring.rabbitmq.queue}")publicStringQUEUE_NAME;privatestaticfinalLogger logger =LoggerFactory.getLogger(ConsumerATest.class);@TestpublicvoidtestConsumerA()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 消费消息的回调DeliverCallback deliverCallback =(consumerTag, message)->{
            logger.info("消费消息成功,消息内容为:"+newString(message.getBody()));};// 取消消费的回调CancelCallback cancelCallback =(consumerTag)->{
            logger.info("消息消费被中断");};/*
          消费者消费消息
          1.消费的队列名称
          2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)
          3.消费者消费消息的回调(函数式接口)
          4.消费者取消消费的回调(函数式接口)
         */
        channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);}}

消费者B:

packagecom.gettler.rabbitmq.work;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.DeliverCallback;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("work")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassConsumerBTest{// 队列名称@Value("${spring.rabbitmq.queue}")publicStringQUEUE_NAME;privatestaticfinalLogger logger =LoggerFactory.getLogger(ConsumerBTest.class);@TestpublicvoidtestConsumerB()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 消费消息的回调DeliverCallback deliverCallback =(consumerTag, message)->{
            logger.info("消费消息成功,消息内容为:"+newString(message.getBody()));};// 取消消费的回调CancelCallback cancelCallback =(consumerTag)->{
            logger.info("消息消费被中断");};/*
          消费者消费消息
          1.消费的队列名称
          2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)
          3.消费者消费消息的回调(函数式接口)
          4.消费者取消消费的回调(函数式接口)
         */
        channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);}}

image-20240620161656576

路由模式

配置文件

application-direct.yml

spring:rabbitmq:host: 123.123.123.123
    port:5672username: Gettler
    password:********virtual-host: /

生产者:

packagecom.gettler.rabbitmq.direct;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;importjava.util.HashMap;importjava.util.Map;@ActiveProfiles("direct")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassProducerTest{privatestaticfinalLogger logger =LoggerFactory.getLogger(ProducerTest.class);@TestpublicvoidtestProducer()throwsException{// 创建channelConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();Channel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare("direct",BuiltinExchangeType.DIRECT);Map<String,String> messageMap =newHashMap<>();
        messageMap.put("info","普通 info 信息");
        messageMap.put("warning","警告 warning 信息");
        messageMap.put("error","错误 error 信息");
        messageMap.put("debug","调试 debug 信息");for(Map.Entry<String,String> mes : messageMap.entrySet()){String routingKey = mes.getKey();String message = mes.getValue();
            channel.basicPublish("direct", routingKey,null, message.getBytes());
            logger.info("消息发送完毕");}}}

消费者A:

packagecom.gettler.rabbitmq.direct;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.*;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("direct")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassConsumerATest{privatestaticfinalLogger logger =LoggerFactory.getLogger(ConsumerATest.class);@TestpublicvoidtestConsumerA()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 创建channel// 声明交换机
        channel.exchangeDeclare("direct",BuiltinExchangeType.DIRECT);// 声明临时队列
        channel.queueDeclare("console",false,false,false,null);// 绑定队列与交换机
        channel.queueBind("console","direct","info");
        channel.queueBind("console","direct","warning");// 消费消息DeliverCallback deliverCallback =(consumerTag, message)->{
            logger.info("获得消息:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{
            logger.info("消息消费被中断");};
        channel.basicConsume("console",true, deliverCallback, cancelCallback);}}

消费者B:

packagecom.gettler.rabbitmq.direct;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.*;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("direct")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassConsumerBTest{privatestaticfinalLogger logger =LoggerFactory.getLogger(ConsumerBTest.class);@TestpublicvoidtestConsumerB()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare("direct",BuiltinExchangeType.DIRECT);// 声明临时队列
        channel.queueDeclare("disk",false,false,false,null);// 绑定队列与交换机
        channel.queueBind("disk","direct","error");// 消费消息DeliverCallback deliverCallback =(consumerTag, message)->{
            logger.info("获得消息:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{
            logger.info("消息消费被中断");};
        channel.basicConsume("disk",true, deliverCallback, cancelCallback);}}

image-20240620161838310

广播模式

配置文件

application-fanout.yml

spring:rabbitmq:host: 123.123.123.123
    port:5672username: Gettler
    password:********virtual-host: /

生产者:

packagecom.gettler.rabbitmq.fanout;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("fanout")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassProducerTest{privatestaticfinalLogger logger =LoggerFactory.getLogger(ProducerTest.class);@TestpublicvoidtestProducer()throwsException{// 创建channelConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();Channel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare("fanout",BuiltinExchangeType.FANOUT);// 发送10条消息for(int i =0; i <10; i++){String message = i +"";
            channel.basicPublish("fanout","",null, message.getBytes());
            logger.info("消息发送完毕"+ message);}}}

消费者A:

packagecom.gettler.rabbitmq.fanout;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.*;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("fanout")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassConsumerATest{privatestaticfinalLogger logger =LoggerFactory.getLogger(ConsumerATest.class);@TestpublicvoidtestConsumerA()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare("fanout",BuiltinExchangeType.FANOUT);// 声明临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列与交换机
        channel.queueBind(queueName,"fanout","");// 消费消息DeliverCallback deliverCallback =(consumerTag, message)->{
            logger.info("获得消息:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{
            logger.info("消息消费被中断");};
        channel.basicConsume(queueName,true, deliverCallback, cancelCallback);}}

消费者B:

packagecom.gettler.rabbitmq.fanout;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.*;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("fanout")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassConsumerBTest{privatestaticfinalLogger logger =LoggerFactory.getLogger(ConsumerBTest.class);@TestpublicvoidtestConsumerB()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare("fanout",BuiltinExchangeType.FANOUT);// 声明临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列与交换机
        channel.queueBind(queueName,"fanout","");// 消费消息DeliverCallback deliverCallback =(consumerTag, message)->{
            logger.info("获得消息:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{
            logger.info("消息消费被中断");};
        channel.basicConsume(queueName,true, deliverCallback, cancelCallback);}}

image-20240620162526952

主题模式

配置文件

application-topic.yml

spring:rabbitmq:host: 123.123.123.123
    port:5672username: Gettler
    password:********virtual-host: /

生产者:

packagecom.gettler.rabbitmq.topic;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;importjava.util.HashMap;importjava.util.Map;/**
 * @author Gettler
 * @date 2024/06/13
 */@ActiveProfiles("topic")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassProducerTest{privatestaticfinalLogger logger =LoggerFactory.getLogger(ProducerTest.class);@TestpublicvoidtestProducer()throwsException{// 创建channelConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();Channel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare("topic",BuiltinExchangeType.TOPIC);Map<String,String> messageMap =newHashMap<>();
        messageMap.put("class1.DB.exam","一班数据库考试通知");
        messageMap.put("class1.OS.exam","一班操作系统考试通知");
        messageMap.put("class2.DB.exam","二班数据库考试通知");
        messageMap.put("class2.OS.exam","二班操作系统考试通知");for(Map.Entry<String,String> mes : messageMap.entrySet()){String routingKey = mes.getKey();String message = mes.getValue();
            channel.basicPublish("topic", routingKey,null, message.getBytes());
            logger.info("消息发送完毕");}}}

消费者A(模拟一班的学生):

packagecom.gettler.rabbitmq.topic;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.*;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;/**
 * @author Gettler
 * @date 2024/06/13
 */@ActiveProfiles("topic")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassStudentOfClass1Consumer{privatestaticfinalLogger logger =LoggerFactory.getLogger(StudentOfClass1Consumer.class);@TestpublicvoidtestStudentOfClass1Consumer()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare("topic",BuiltinExchangeType.TOPIC);// 创建Q1队列
        channel.queueDeclare("student_of_class1",false,false,false,null);// 绑定队列与交换机
        channel.queueBind("student_of_class1","topic","class1.#");// 消费消息DeliverCallback deliverCallback =(consumerTag, message)->{
            logger.info("获得消息:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{
            logger.info("消息消费被中断");};
        channel.basicConsume("student_of_class1",true, deliverCallback, cancelCallback);}}

消费者B(模拟操作系统老师):

packagecom.gettler.rabbitmq.topic;importcom.gettler.rabbitmq.RabbitmqApplication;importcom.gettler.rabbitmq.config.RabbitMqConnectionFactory;importcom.rabbitmq.client.*;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.ActiveProfiles;importorg.springframework.test.context.junit4.SpringRunner;/**
 * @author Gettler
 * @date 2024/06/13
 */@ActiveProfiles("topic")@RunWith(SpringRunner.class)@SpringBootTest(classes =RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)publicclassTeacherConsumer{privatestaticfinalLogger logger =LoggerFactory.getLogger(TeacherConsumer.class);@TestpublicvoidtestTeacherConsumer()throwsException{// 创建一个connectionConnection connection =RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare("topic",BuiltinExchangeType.TOPIC);// 创建Q1队列
        channel.queueDeclare("teacher_of_OS",false,false,false,null);// 绑定队列与交换机
        channel.queueBind("teacher_of_OS","topic","#.OS.#");// 消费消息DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("获得消息:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{System.out.println("消息消费被中断");};
        channel.basicConsume("teacher_of_OS",true, deliverCallback, cancelCallback);}}

image-20240620162754734

谷粒商城 RabbitMQ 学习笔记

新建Maven项目

添加依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.0.0</version></dependency>

编写发送端

packageorg.example;importjava.util.concurrent.TimeoutException;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassSend{//队列名称  privatefinalstaticStringQUEUE_NAME="helloMQ";publicstaticvoidmain(String[] argv)throwsjava.io.IOException,TimeoutException{/** 
         * 创建连接连接到MabbitMQ 
         */ConnectionFactory factory =newConnectionFactory();//设置MabbitMQ所在主机ip或者主机名  
        factory.setHost("localhost");//创建一个连接  Connection connection = factory.newConnection();//创建一个频道  Channel channel = connection.createChannel();//指定一个队列  
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发送的消息  String message ="hello world!";//往队列中发出一条消息  
        channel.basicPublish("",QUEUE_NAME,null, message.getBytes());System.out.println(" [x] Sent '"+ message +"'");//关闭频道和连接  
        channel.close();  
        connection.close();}}

编写接收端

packageorg.example;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassRecv{// 队列名称privatefinalstaticStringQUEUE_NAME="helloMQ";publicstaticvoidmain(String[] argv)throwsException{// 打开连接和创建频道,与发送端一样ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//创建消费者Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String message =newString(body,"UTF-8");System.out.println(" [x] Received '"+ message +"'");}};
        channel.basicConsume(QUEUE_NAME,true, consumer);}}

运行接收端

image-20220723101156639

运行发送端,每运行一次发送一次消息

image-20220723101246973

管理网站上有接收端的连接(发送端发送后便断开连接了)

image-20220723101256826

添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
编写配置文件
spring.rabbitmq.host=192.168.3.200
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
创建Exchange
publicvoidcreateExchange(){DirectExchange directExchange =newDirectExchange("hello-java-exchange",true,false);
    amqpAdmin.declareExchange(directExchange);}
创建Queue
publicvoidcreateQueue(){Queue queue =newQueue("hello-java-queue",true,false,false);
    amqpAdmin.declareQueue(queue);}
连接Queue和Exchange
publicvoidcreateBinding(){Binding binding =newBinding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
    amqpAdmin.declareBinding(binding);}
发送消息
publicvoidsendMessage(){String msg ="hello world";List<String> s =newArrayList<>();
    s.add(msg);
    s.add("List");
    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java", s,newCorrelationData(UUID.randomUUID().toString()));}
接收消息

想要接受对象消息,需使用JSON序列化机制,进行消息转换

编写MyRabbitConfig配置类

@ConfigurationpublicclassMyRabbitConfig{@AutowiredRabbitTemplate rabbitTemplate;/**
     * 使用JSON序列化机制,进行消息转换
     * @return
     */@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}}

使用RabbitListener注解监听队列,该注解参数可以是Object content, Message message, Channel channel。

@RabbitListener(queues ={"hello-java-queue"})publicvoidreceiveMessage(Object message){System.out.println("接受到消息内容:"+ message);}
可靠抵达

编写配置文件

# 开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 抵达队列后以异步发送优先回调抵达队列后的回调returnconfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

将MyRabbitConfig修改为

@ConfigurationpublicclassMyRabbitConfig{@AutowiredRabbitTemplate rabbitTemplate;/**
     * 使用JSON序列化机制,进行消息转换
     *
     * @return
     */@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}@PostConstruct// MyRabbitConfig对象创建完成后执行该方法publicvoidinitRabbitTemplate(){
        rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){/**
             * 消息抵达节点的话ack就为true
             * @param correlationData   当前消息的唯一关联数据(消息唯一ID)
             * @param ack 消息是否成功收到
             * @param cause 失败原因
             */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){System.out.println("confirming...correlationData{"+ correlationData +"},ack{"+ ack +"},cause{"+ cause +"}");}});
        rabbitTemplate.setReturnCallback(newRabbitTemplate.ReturnCallback(){/**
             * 没抵达队列,触发这个失败回调函数
             * @param message
             * @param replyCode
             * @param replyText
             * @param exchange
             * @param routingKey
             */@OverridepublicvoidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){System.out.println("Unreachable...message{"+ message +"},replyCode{"+ replyText +"},exchange{"+ exchange +"},routingKey{"+ routingKey +"}");}});}}

监听队列方法修改为

@RabbitListener(queues ={"hello-java-queue"})publicvoidreceiveMessage(Message message,List list,Channel channel)throwsIOException{System.out.println("接受到消息内容:"+ list);// channel内按顺序递增long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println(deliveryTag);// 签收try{
        channel.basicAck(deliveryTag,false);// 是否批量签收}catch(Exception e){// 网络中断// b1 = false 丢弃, b1 = true 发回服务器,服务器重新入队。
        channel.basicNack(deliveryTag,false,false);}}
标签: rabbitmq 学习 笔记

本文转载自: https://blog.csdn.net/qq_46039856/article/details/139836562
版权归原作者 Gettler•Main 所有, 如有侵权,请联系我们删除。

“RabbitMQ 学习笔记”的评论:

还没有评论