docker部署rabbitmq
# management才有管理页面docker pull rabbitmq:management
# 新建容器并运行docker run \-eRABBITMQ_DEFAULT_USER=admin \-eRABBITMQ_DEFAULT_PASS=admin \-v mq-plugins:/plugins \--name mq \--hostname mq \-p15672:15672 \-p5672:5672 \-itd\
rabbitmq:management
# 查看运行状态dockerps-a
导入RabbitMQ依赖
pom.xml
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 消息转换器需要用到的Jackson --><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>
提供者和消费者的配置
application.yml
spring:rabbitmq:host: 192.168.137.139 # 主机名port:5672# 端口virtual-host: / # 虚拟主机username: admin # 用户名password: admin # 密码
消息转换器
提供者和消费者都可以添加
RabbitMQMessageConverterConfig.java
@ConfigurationpublicclassRabbitMQMessageConverterConfig{@BeanpublicstaticMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}}
注意执行顺序
:需要先使用consumer监听并创建队列(需要保证队列存在!),publisher再往里面添加队列才会有用,否则白添加队列
SimpleQueue
提供者:
SimpleQueuePublisher.java
@RunWith(SpringRunner.class)@SpringBootTestpublicclassSimpleQueuePublisher{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidsimpleQueueTest(){String msg ="hello, simple queue";
rabbitTemplate.convertAndSend("simple.queue", msg);}}
消费者:
SimpleQueueConsumer.java
@ComponentpublicclassSimpleQueueConsumer{@RabbitListener(queues ="simple.queue")publicvoidsimpleQueueConsumer(String msg){System.out.println("simpleQueueConsumer: "+ msg);}}
Work Queues
可加配置
application.yml
spring:rabbitmq:listener:simple:# simple类型prefetch:1# consumer每次执行预取的数量
消费者:
WorkQueueConsumer.java
@ComponentpublicclassWorkQueueConsumer{@RabbitListener(queuesToDeclare ={@Queue(name ="work.queue")})publicvoidworkQueue1Consumer(String msg)throwsInterruptedException{System.out.println("workQueue1Consumer: "+ msg);Thread.sleep(10);}@RabbitListener(queuesToDeclare ={@Queue(name ="work.queue")})publicvoidworkQueue2Consumer(String msg)throwsInterruptedException{System.out.println("workQueue2Consumer: "+ msg);Thread.sleep(90);}}
提供者:
WorkQueuePublisher.java
@RunWith(SpringRunner.class)@SpringBootTestpublicclassWorkQueuePublisher{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidworkQueueTest(){for(int i =1; i <=100; i ++){String msg ="hello, work queue. "+ i;
rabbitTemplate.convertAndSend("work.queue", msg);}}}
发布/订阅
Fanout
消费者:
FanoutQueueConsumer.java
@Exchange
和
@Queue
注解中的
declare
属性默认为
"true"
,如果不存在会自动创建exchange和queue。
@ComponentpublicclassFanoutQueueConsumer{@RabbitListener(bindings =@QueueBinding(
exchange =@Exchange(name ="fanout", type =ExchangeTypes.FANOUT),
value =@Queue(name ="fanout.queue1")))publicvoidfanoutQueue1Consumer(String msg){System.out.println("fanoutQueue1Consumer: "+ msg);}@RabbitListener(bindings =@QueueBinding(
exchange =@Exchange(name ="fanout", type =ExchangeTypes.FANOUT),
value =@Queue(name ="fanout.queue2")))publicvoidfanoutQueue2Consumer(String msg){System.out.println("fanoutQueue2Consumer: "+ msg);}}
提供者:
FanoutQueuePublisher.java
@RunWith(SpringRunner.class)@SpringBootTestpublicclassFanoutQueuePublisher{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidfanoutQueueTest(){String exchangeName ="fanout";String msg ="hello, fanout queue.";
rabbitTemplate.convertAndSend(exchangeName,"", msg);}}
Routing / Direct
消费者:
DirectQueueConsumer.java
@ComponentpublicclassDirectQueueConsumer{@RabbitListener(bindings =@QueueBinding(
exchange =@Exchange(name ="direct", type =ExchangeTypes.DIRECT),
value =@Queue(name ="direct.queue1")))publicvoiddirectQueue1Consumer(String msg){System.out.println("directQueue1Consumer: "+ msg);}@RabbitListener(bindings =@QueueBinding(
exchange =@Exchange(name ="direct", type =ExchangeTypes.DIRECT),
value =@Queue(name ="direct.queue2")))publicvoiddirectQueue2Consumer(String msg){System.out.println("directQueue2Consumer: "+ msg);}}
提供者:
DirectQueuePublisher.java
@RunWith(SpringRunner.class)@SpringBootTestpublicclassDirectQueuePublisher{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoiddirectQueueTest(){String exchangeName ="direct";String key ="error";// String key = "warning";String msg ="hello, direct queue, "+ key;
rabbitTemplate.convertAndSend(exchangeName, key, msg);}}
Topics
*: 通配一个单词
和
#: 通配多个单词
消费者:
TopicQueueConsumer.java
@ConfigurationpublicclassTopicQueueConsumer{@RabbitListener(bindings =@QueueBinding(
exchange =@Exchange(name ="topic", type =ExchangeTypes.TOPIC),
value =@Queue(name ="topic.queue1"),
key ={"*.orange.*"}))publicvoidtopicQueue1Consumer(String msg){System.out.println("topicQueue1Consumer: "+ msg);}@RabbitListener(bindings =@QueueBinding(
exchange =@Exchange(name ="topic", type =ExchangeTypes.TOPIC),
value =@Queue(name ="topic.queue2"),
key ={"*.*.rabbit","lazy.#"}))publicvoidtopicQueue2Consumer(String msg){System.out.println("topicQueue2Consumer: "+ msg);}}
提供者:
TopicQueuePublisher.java
@RunWith(SpringRunner.class)@SpringBootTestpublicclassTopicQueuePublisher{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtopicQueueTest(){String exchangeName ="topic";String key ="lazy.orange.rabbit";String msg ="hello, topic queue. "+ key;
rabbitTemplate.convertAndSend(exchangeName, key, msg);}}
本文转载自: https://blog.csdn.net/m1015422754/article/details/132593013
版权归原作者 Code-Horse 所有, 如有侵权,请联系我们删除。
版权归原作者 Code-Horse 所有, 如有侵权,请联系我们删除。