一、项目配置版本
- SpringBoot版本:3.3.0
- JDK版本:21
二、创建项目
三、依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>
四、yml配置
配置代码
spring:active-mq:broker-url: tcp://localhost:61616user: admin
password: admin
# jms:# # 默认目标类型是否为主题:true 表示默认目标类型是主题(使用发布/订阅模式),false 表示默认目标类型是队列(使用点对点模式)# pub-sub-domain: true
默认目标类型是否为主题
可以通过
pub-sub-domain
,配置默认情况下目标类型;如果没有配置,默认使用队列模式发送和接收消息。
队列模式和主题模式的区别:
- 队列(使用点对点模式),一个消息,只会被一个消费者消费;
- 主题(使用发布/订阅模式),一个消息,可以被订阅该主题的多个消费者消费;
ActiveMQ支持使用Java代码,为每个生产者和消费者单独配置目标类型;
五、目标(Destination)常量
目标(Destination)常量,定义队列或主题的名字。
packagecom.example.activemq.constant;publicclassQueueDestination{publicstaticfinalStringTEST_QUEUE_1="Test.Queue1";publicstaticfinalStringTEST_QUEUE_2="Test.Queue2";}
packagecom.example.activemq.constant;publicclassTopicDestination{publicstaticfinalStringTEST_TOPIC_1="Test.Topic1";publicstaticfinalStringTEST_TOPIC_2="Test.Topic2";}
六、队列(Queue):默认目标类型
队列(Queue)生产者
packagecom.example.activemq.producer;importcom.example.activemq.constant.QueueDestination;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;/**
* 队列消息生产者
* <p>
* 用于发送消息到指定的JMS队列
*/@ComponentpublicclassQueueProducer{/**
* JmsMessagingTemplate 用于消息的发送,它封装了消息转换和发送的逻辑
*/privatefinalJmsMessagingTemplate jmsMessagingTemplate;publicQueueProducer(JmsMessagingTemplate jmsMessagingTemplate){this.jmsMessagingTemplate = jmsMessagingTemplate;}/**
* 发送消息到指定的队列
*
* @param message 要发送的消息内容
*/publicvoidsend(String message){// 调用 JmsMessagingTemplate 的 convertAndSend 方法,将消息发送到指定队列。
jmsMessagingTemplate.convertAndSend(QueueDestination.TEST_QUEUE_1, message);}}
生产者发送消息
packagecom.example.activemq.producer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/**
* 队列消息生产者测试
*/@SpringBootTestclassQueueProducerTest{@AutowiredprivateQueueProducer producer;@TestvoidtestSendToQueue(){
producer.send("Hello ActiveMQ, queue message");}}
队列(Queue)消费者
packagecom.example.activemq.consumer;importcom.example.activemq.constant.QueueDestination;importlombok.extern.slf4j.Slf4j;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassQueueConsumer{@JmsListener(destination =QueueDestination.TEST_QUEUE_1)privatevoidreceive(String message){
log.info("队列消费者,接收消息: {}", message);}}
消息日志
2024-06-05T22:21:24.834+08:00 INFO 15640 — [active-mq-consumer] [ntContainer#0-1] c.e.activemq.consumer.QueueConsumer : 队列消费者,接收消息: Hello ActiveMQ, queue message
ActiveMQ控制台效果
七、主题(Topic):默认目标类型
配置默认目标类型
将上面配置模块中的
pub-sub-domain
设置为
true
,则默认目标类型使用
发布/订阅
模式。
spring:jms:# 默认目标类型是否为主题:true 表示默认目标类型是主题(使用发布/订阅模式),false 表示默认目标类型是队列(使用点对点模式)pub-sub-domain:true
主题(Topic)生产者
packagecom.example.activemq.producer;importcom.example.activemq.constant.TopicDestination;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;/**
* 主题消息生产者
* <p>
* 用于发送消息到指定的JMS主题
*/@ComponentpublicclassTopicProducer{/**
* JmsMessagingTemplate 用于消息的发送,它封装了消息转换和发送的逻辑
*/privatefinalJmsMessagingTemplate jmsMessagingTemplate;publicTopicProducer(JmsMessagingTemplate jmsMessagingTemplate){this.jmsMessagingTemplate = jmsMessagingTemplate;}/**
* 发送消息到指定的主题
*
* @param message 要发送的消息内容
*/publicvoidsend(String message){// 调用 JmsMessagingTemplate 的 convertAndSend 方法,将消息发送到指定主题。
jmsMessagingTemplate.convertAndSend(TopicDestination.TEST_TOPIC_1, message);}}
生产者发送消息
packagecom.example.activemq.producer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/**
* 主题消息生产者测试
*/@SpringBootTestclassTopicProducerTest{@AutowiredprivateTopicProducer producer;@TestvoidtestSendToTopic(){
producer.send("Hello ActiveMQ, topic message");}}
主题(Topic)消费者
packagecom.example.activemq.consumer;importcom.example.activemq.constant.TopicDestination;importlombok.extern.slf4j.Slf4j;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassTopicConsumer{@JmsListener(destination =TopicDestination.TEST_TOPIC_1)privatevoidreceive(String message){
log.info("主题消费者,接收消息: {}", message);}}
接收消息的日志
2024-06-05T23:51:46.026+08:00 INFO 8584 — [active-mq-consumer] [ntContainer#3-1] c.e.activemq.consumer.TopicConsumer : 主题消费者,接收消息: Hello ActiveMQ, topic message
ActiveMQ控制台效果
八、自定义配置队列和主题
SpringBoot 集成 ActiveMQ,不同的
生产者
可以是不同的模式;比如一个生产者是发送队列消息,一个生产者发送主题消息。同样的,不同的
消费者
也可以是不同的模式;比如一个消费者是接收队列消息,一个消费者接收主题消息。
此时,可以使用Java代码配置队列(默认模式)或主题;此时可以不在 yml 中配置
pub-sub-domain
。
生产者
配置
packagecom.example.activemq.config;importcom.example.activemq.constant.QueueDestination;importcom.example.activemq.constant.TopicDestination;importjakarta.jms.Queue;importjakarta.jms.Topic;importorg.apache.activemq.command.ActiveMQQueue;importorg.apache.activemq.command.ActiveMQTopic;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassJmsConfig{@BeanpublicQueuequeue(){returnnewActiveMQQueue(QueueDestination.TEST_QUEUE_2);}@BeanpublicTopictopic(){returnnewActiveMQTopic(TopicDestination.TEST_TOPIC_2);}}
生产者代码
packagecom.example.activemq.producer;importjakarta.jms.Queue;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;/**
* 队列消息生产者
*/@ComponentpublicclassQueueMessageProducer{privatefinalQueue queue;privatefinalJmsMessagingTemplate jmsMessagingTemplate;publicQueueMessageProducer(Queue queue,JmsMessagingTemplate jmsMessagingTemplate){this.queue = queue;this.jmsMessagingTemplate = jmsMessagingTemplate;}publicvoidsend(String message){
jmsMessagingTemplate.convertAndSend(queue, message);}}
packagecom.example.activemq.producer;importjakarta.jms.Topic;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;/**
* 主题消息生产者
*/@ComponentpublicclassTopicMessageProducer{privatefinalTopic topic;privatefinalJmsMessagingTemplate jmsMessagingTemplate;publicTopicMessageProducer(Topic topic,JmsMessagingTemplate jmsMessagingTemplate){this.topic = topic;this.jmsMessagingTemplate = jmsMessagingTemplate;}publicvoidsend(String message){
jmsMessagingTemplate.convertAndSend(topic, message);}}
生产者发送消息
packagecom.example.activemq.producer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/**
* 队列消息生产者测试
*/@SpringBootTestclassQueueMessageProducerTest{@AutowiredprivateQueueMessageProducer producer;/**
* 发送多个队列消息
*/@TestvoidtestSendToQueueMultiple(){for(int i =0; i <10; i++){
producer.send("Hello ActiveMQ, queue message "+ i);}}}
packagecom.example.activemq.producer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/**
* 主题消息生产者测试
*/@SpringBootTestclassTopicMessageProducerTest{@AutowiredprivateTopicMessageProducer producer;/**
* 发送多个主题消息
*/@TestvoidtestSendToTopicMultiple(){for(int i =0; i <2; i++){
producer.send("Hello ActiveMQ, topic message "+ i);}}}
消费者
配置
packagecom.example.activemq.config;importjakarta.jms.ConnectionFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.jms.config.DefaultJmsListenerContainerFactory;importorg.springframework.jms.config.JmsListenerContainerFactory;@ConfigurationpublicclassJmsConfig{@BeanpublicJmsListenerContainerFactory<?>jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory){DefaultJmsListenerContainerFactory bean =newDefaultJmsListenerContainerFactory();
bean.setPubSubDomain(false);// 设置为队列模式(点对点模式)
bean.setConnectionFactory(activeMQConnectionFactory);return bean;}@BeanpublicJmsListenerContainerFactory<?>jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory){DefaultJmsListenerContainerFactory bean =newDefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);// 设置为发布/订阅模式(主题模式)
bean.setConnectionFactory(activeMQConnectionFactory);return bean;}}
消费者代码
packagecom.example.activemq.consumer;importcom.example.activemq.constant.QueueDestination;importlombok.SneakyThrows;importlombok.extern.slf4j.Slf4j;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassQueueMessageConsumer{@SneakyThrows@JmsListener(destination =QueueDestination.TEST_QUEUE_2, containerFactory ="jmsListenerContainerQueue")publicvoidreceive1(String message){
log.info("队列消费者-1,接收消息: {}", message);Thread.sleep(1000);}@SneakyThrows@JmsListener(destination =QueueDestination.TEST_QUEUE_2, containerFactory ="jmsListenerContainerQueue")publicvoidreceive2(String message){
log.info("队列消费者-2,接收消息: {}", message);Thread.sleep(1000);}@SneakyThrows@JmsListener(destination =QueueDestination.TEST_QUEUE_2, containerFactory ="jmsListenerContainerQueue")publicvoidreceive3(String message){
log.info("队列消费者-3,接收消息: {}", message);Thread.sleep(1000);}}
packagecom.example.activemq.consumer;importcom.example.activemq.constant.TopicDestination;importlombok.SneakyThrows;importlombok.extern.slf4j.Slf4j;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassTopicMessageConsumer{@SneakyThrows@JmsListener(destination =TopicDestination.TEST_TOPIC_2, containerFactory ="jmsListenerContainerTopic")publicvoidreceive1(String message){
log.info("主题消费者-1,接收消息: {}", message);Thread.sleep(1000);}@SneakyThrows@JmsListener(destination =TopicDestination.TEST_TOPIC_2, containerFactory ="jmsListenerContainerTopic")publicvoidreceive2(String message){
log.info("主题消费者-2,接收消息: {}", message);Thread.sleep(1000);}@SneakyThrows@JmsListener(destination =TopicDestination.TEST_TOPIC_2, containerFactory ="jmsListenerContainerTopic")publicvoidreceive3(String message){
log.info("主题消费者-3,接收消息: {}", message);Thread.sleep(1000);}}
接收结果
队列接收结果
2024-06-06T19:12:04.256+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 0
2024-06-06T19:12:04.259+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#3-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-1,接收消息: Hello ActiveMQ, queue message 1
2024-06-06T19:12:04.284+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#2-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-3,接收消息: Hello ActiveMQ, queue message 2
2024-06-06T19:12:05.278+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 3
2024-06-06T19:12:05.278+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#3-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-1,接收消息: Hello ActiveMQ, queue message 4
2024-06-06T19:12:05.291+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#2-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-3,接收消息: Hello ActiveMQ, queue message 5
2024-06-06T19:12:06.293+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#3-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-1,接收消息: Hello ActiveMQ, queue message 7
2024-06-06T19:12:06.293+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#2-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-3,接收消息: Hello ActiveMQ, queue message 8
2024-06-06T19:12:06.294+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 6
2024-06-06T19:12:07.310+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 9
主题接收结果
2024-06-06T19:57:55.141+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#4-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-2,接收消息: Hello ActiveMQ, topic message 0
2024-06-06T19:57:55.141+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#6-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-1,接收消息: Hello ActiveMQ, topic message 0
2024-06-06T19:57:55.141+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#5-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-3,接收消息: Hello ActiveMQ, topic message 0
2024-06-06T19:57:56.143+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#6-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-1,接收消息: Hello ActiveMQ, topic message 1
2024-06-06T19:57:56.143+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#4-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-2,接收消息: Hello ActiveMQ, topic message 1
2024-06-06T19:57:56.150+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#5-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-3,接收消息: Hello ActiveMQ, topic message 1
ActiveMQ控制台效果
队列- 控制台效果
主题-控制台效果
版权归原作者 宋冠巡 所有, 如有侵权,请联系我们删除。