0


SpringBoot整合ActiveMQ

一、项目配置版本

  • SpringBoot版本:3.3.0
  • JDK版本:21

二、创建项目

在这里插入图片描述

三、依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>

四、yml配置

配置代码

spring:active-mq:broker-url: tcp://localhost:61616user: admin
    password: admin
#  jms:#    # 默认目标类型是否为主题:true 表示默认目标类型是主题(使用发布/订阅模式),false 表示默认目标类型是队列(使用点对点模式)#    pub-sub-domain: true

默认目标类型是否为主题

可以通过

pub-sub-domain

,配置默认情况下目标类型;如果没有配置,默认使用队列模式发送和接收消息。

队列模式和主题模式的区别:

  1. 队列(使用点对点模式),一个消息,只会被一个消费者消费;
  2. 主题(使用发布/订阅模式),一个消息,可以被订阅该主题的多个消费者消费;

ActiveMQ支持使用Java代码,为每个生产者和消费者单独配置目标类型;

五、目标(Destination)常量

目标(Destination)常量,定义队列或主题的名字。

packagecom.example.activemq.constant;publicclassQueueDestination{publicstaticfinalStringTEST_QUEUE_1="Test.Queue1";publicstaticfinalStringTEST_QUEUE_2="Test.Queue2";}
packagecom.example.activemq.constant;publicclassTopicDestination{publicstaticfinalStringTEST_TOPIC_1="Test.Topic1";publicstaticfinalStringTEST_TOPIC_2="Test.Topic2";}

六、队列(Queue):默认目标类型

队列(Queue)生产者

packagecom.example.activemq.producer;importcom.example.activemq.constant.QueueDestination;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;/**
 * 队列消息生产者
 * <p>
 * 用于发送消息到指定的JMS队列
 */@ComponentpublicclassQueueProducer{/**
     * JmsMessagingTemplate 用于消息的发送,它封装了消息转换和发送的逻辑
     */privatefinalJmsMessagingTemplate jmsMessagingTemplate;publicQueueProducer(JmsMessagingTemplate jmsMessagingTemplate){this.jmsMessagingTemplate = jmsMessagingTemplate;}/**
     * 发送消息到指定的队列
     *
     * @param message 要发送的消息内容
     */publicvoidsend(String message){// 调用 JmsMessagingTemplate 的 convertAndSend 方法,将消息发送到指定队列。
        jmsMessagingTemplate.convertAndSend(QueueDestination.TEST_QUEUE_1, message);}}

生产者发送消息

packagecom.example.activemq.producer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/**
 * 队列消息生产者测试
 */@SpringBootTestclassQueueProducerTest{@AutowiredprivateQueueProducer producer;@TestvoidtestSendToQueue(){
        producer.send("Hello ActiveMQ, queue message");}}

队列(Queue)消费者

packagecom.example.activemq.consumer;importcom.example.activemq.constant.QueueDestination;importlombok.extern.slf4j.Slf4j;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassQueueConsumer{@JmsListener(destination =QueueDestination.TEST_QUEUE_1)privatevoidreceive(String message){
        log.info("队列消费者,接收消息: {}", message);}}

消息日志

2024-06-05T22:21:24.834+08:00 INFO 15640 — [active-mq-consumer] [ntContainer#0-1] c.e.activemq.consumer.QueueConsumer : 队列消费者,接收消息: Hello ActiveMQ, queue message

ActiveMQ控制台效果

在这里插入图片描述

七、主题(Topic):默认目标类型

配置默认目标类型

将上面配置模块中的

pub-sub-domain

设置为

true

,则默认目标类型使用

发布/订阅

模式。

spring:jms:# 默认目标类型是否为主题:true 表示默认目标类型是主题(使用发布/订阅模式),false 表示默认目标类型是队列(使用点对点模式)pub-sub-domain:true

主题(Topic)生产者

packagecom.example.activemq.producer;importcom.example.activemq.constant.TopicDestination;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;/**
 * 主题消息生产者
 * <p>
 * 用于发送消息到指定的JMS主题
 */@ComponentpublicclassTopicProducer{/**
     * JmsMessagingTemplate 用于消息的发送,它封装了消息转换和发送的逻辑
     */privatefinalJmsMessagingTemplate jmsMessagingTemplate;publicTopicProducer(JmsMessagingTemplate jmsMessagingTemplate){this.jmsMessagingTemplate = jmsMessagingTemplate;}/**
     * 发送消息到指定的主题
     *
     * @param message 要发送的消息内容
     */publicvoidsend(String message){// 调用 JmsMessagingTemplate 的 convertAndSend 方法,将消息发送到指定主题。
        jmsMessagingTemplate.convertAndSend(TopicDestination.TEST_TOPIC_1, message);}}

生产者发送消息

packagecom.example.activemq.producer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/**
 * 主题消息生产者测试
 */@SpringBootTestclassTopicProducerTest{@AutowiredprivateTopicProducer producer;@TestvoidtestSendToTopic(){
        producer.send("Hello ActiveMQ, topic message");}}

主题(Topic)消费者

packagecom.example.activemq.consumer;importcom.example.activemq.constant.TopicDestination;importlombok.extern.slf4j.Slf4j;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassTopicConsumer{@JmsListener(destination =TopicDestination.TEST_TOPIC_1)privatevoidreceive(String message){
        log.info("主题消费者,接收消息: {}", message);}}

接收消息的日志

2024-06-05T23:51:46.026+08:00 INFO 8584 — [active-mq-consumer] [ntContainer#3-1] c.e.activemq.consumer.TopicConsumer : 主题消费者,接收消息: Hello ActiveMQ, topic message

ActiveMQ控制台效果

在这里插入图片描述

八、自定义配置队列和主题

SpringBoot 集成 ActiveMQ,不同的

生产者

可以是不同的模式;比如一个生产者是发送队列消息,一个生产者发送主题消息。同样的,不同的

消费者

也可以是不同的模式;比如一个消费者是接收队列消息,一个消费者接收主题消息。

此时,可以使用Java代码配置队列(默认模式)或主题;此时可以不在 yml 中配置

pub-sub-domain

生产者

配置

packagecom.example.activemq.config;importcom.example.activemq.constant.QueueDestination;importcom.example.activemq.constant.TopicDestination;importjakarta.jms.Queue;importjakarta.jms.Topic;importorg.apache.activemq.command.ActiveMQQueue;importorg.apache.activemq.command.ActiveMQTopic;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassJmsConfig{@BeanpublicQueuequeue(){returnnewActiveMQQueue(QueueDestination.TEST_QUEUE_2);}@BeanpublicTopictopic(){returnnewActiveMQTopic(TopicDestination.TEST_TOPIC_2);}}

生产者代码

packagecom.example.activemq.producer;importjakarta.jms.Queue;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;/**
 * 队列消息生产者
 */@ComponentpublicclassQueueMessageProducer{privatefinalQueue queue;privatefinalJmsMessagingTemplate jmsMessagingTemplate;publicQueueMessageProducer(Queue queue,JmsMessagingTemplate jmsMessagingTemplate){this.queue = queue;this.jmsMessagingTemplate = jmsMessagingTemplate;}publicvoidsend(String message){
        jmsMessagingTemplate.convertAndSend(queue, message);}}
packagecom.example.activemq.producer;importjakarta.jms.Topic;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;/**
 * 主题消息生产者
 */@ComponentpublicclassTopicMessageProducer{privatefinalTopic topic;privatefinalJmsMessagingTemplate jmsMessagingTemplate;publicTopicMessageProducer(Topic topic,JmsMessagingTemplate jmsMessagingTemplate){this.topic = topic;this.jmsMessagingTemplate = jmsMessagingTemplate;}publicvoidsend(String message){
        jmsMessagingTemplate.convertAndSend(topic, message);}}

生产者发送消息

packagecom.example.activemq.producer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/**
 * 队列消息生产者测试
 */@SpringBootTestclassQueueMessageProducerTest{@AutowiredprivateQueueMessageProducer producer;/**
     * 发送多个队列消息
     */@TestvoidtestSendToQueueMultiple(){for(int i =0; i <10; i++){
            producer.send("Hello ActiveMQ, queue message "+ i);}}}
packagecom.example.activemq.producer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;/**
 * 主题消息生产者测试
 */@SpringBootTestclassTopicMessageProducerTest{@AutowiredprivateTopicMessageProducer producer;/**
     * 发送多个主题消息
     */@TestvoidtestSendToTopicMultiple(){for(int i =0; i <2; i++){
            producer.send("Hello ActiveMQ, topic message "+ i);}}}

消费者

配置

packagecom.example.activemq.config;importjakarta.jms.ConnectionFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.jms.config.DefaultJmsListenerContainerFactory;importorg.springframework.jms.config.JmsListenerContainerFactory;@ConfigurationpublicclassJmsConfig{@BeanpublicJmsListenerContainerFactory<?>jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory){DefaultJmsListenerContainerFactory bean =newDefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(false);// 设置为队列模式(点对点模式)
        bean.setConnectionFactory(activeMQConnectionFactory);return bean;}@BeanpublicJmsListenerContainerFactory<?>jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory){DefaultJmsListenerContainerFactory bean =newDefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);// 设置为发布/订阅模式(主题模式)
        bean.setConnectionFactory(activeMQConnectionFactory);return bean;}}

消费者代码

packagecom.example.activemq.consumer;importcom.example.activemq.constant.QueueDestination;importlombok.SneakyThrows;importlombok.extern.slf4j.Slf4j;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassQueueMessageConsumer{@SneakyThrows@JmsListener(destination =QueueDestination.TEST_QUEUE_2, containerFactory ="jmsListenerContainerQueue")publicvoidreceive1(String message){
        log.info("队列消费者-1,接收消息: {}", message);Thread.sleep(1000);}@SneakyThrows@JmsListener(destination =QueueDestination.TEST_QUEUE_2, containerFactory ="jmsListenerContainerQueue")publicvoidreceive2(String message){
        log.info("队列消费者-2,接收消息: {}", message);Thread.sleep(1000);}@SneakyThrows@JmsListener(destination =QueueDestination.TEST_QUEUE_2, containerFactory ="jmsListenerContainerQueue")publicvoidreceive3(String message){
        log.info("队列消费者-3,接收消息: {}", message);Thread.sleep(1000);}}
packagecom.example.activemq.consumer;importcom.example.activemq.constant.TopicDestination;importlombok.SneakyThrows;importlombok.extern.slf4j.Slf4j;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassTopicMessageConsumer{@SneakyThrows@JmsListener(destination =TopicDestination.TEST_TOPIC_2, containerFactory ="jmsListenerContainerTopic")publicvoidreceive1(String message){
        log.info("主题消费者-1,接收消息: {}", message);Thread.sleep(1000);}@SneakyThrows@JmsListener(destination =TopicDestination.TEST_TOPIC_2, containerFactory ="jmsListenerContainerTopic")publicvoidreceive2(String message){
        log.info("主题消费者-2,接收消息: {}", message);Thread.sleep(1000);}@SneakyThrows@JmsListener(destination =TopicDestination.TEST_TOPIC_2, containerFactory ="jmsListenerContainerTopic")publicvoidreceive3(String message){
        log.info("主题消费者-3,接收消息: {}", message);Thread.sleep(1000);}}

接收结果

队列接收结果
2024-06-06T19:12:04.256+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 0
2024-06-06T19:12:04.259+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#3-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-1,接收消息: Hello ActiveMQ, queue message 1
2024-06-06T19:12:04.284+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#2-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-3,接收消息: Hello ActiveMQ, queue message 2
2024-06-06T19:12:05.278+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 3
2024-06-06T19:12:05.278+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#3-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-1,接收消息: Hello ActiveMQ, queue message 4
2024-06-06T19:12:05.291+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#2-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-3,接收消息: Hello ActiveMQ, queue message 5
2024-06-06T19:12:06.293+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#3-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-1,接收消息: Hello ActiveMQ, queue message 7
2024-06-06T19:12:06.293+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#2-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-3,接收消息: Hello ActiveMQ, queue message 8
2024-06-06T19:12:06.294+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 6
2024-06-06T19:12:07.310+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer      : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 9
主题接收结果
2024-06-06T19:57:55.141+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#4-1] c.e.a.consumer.TopicMessageConsumer      : 主题消费者-2,接收消息: Hello ActiveMQ, topic message 0
2024-06-06T19:57:55.141+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#6-1] c.e.a.consumer.TopicMessageConsumer      : 主题消费者-1,接收消息: Hello ActiveMQ, topic message 0
2024-06-06T19:57:55.141+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#5-1] c.e.a.consumer.TopicMessageConsumer      : 主题消费者-3,接收消息: Hello ActiveMQ, topic message 0
2024-06-06T19:57:56.143+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#6-1] c.e.a.consumer.TopicMessageConsumer      : 主题消费者-1,接收消息: Hello ActiveMQ, topic message 1
2024-06-06T19:57:56.143+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#4-1] c.e.a.consumer.TopicMessageConsumer      : 主题消费者-2,接收消息: Hello ActiveMQ, topic message 1
2024-06-06T19:57:56.150+08:00  INFO 12064 --- [active-mq-consumer] [ntContainer#5-1] c.e.a.consumer.TopicMessageConsumer      : 主题消费者-3,接收消息: Hello ActiveMQ, topic message 1

ActiveMQ控制台效果

队列- 控制台效果

在这里插入图片描述

主题-控制台效果

在这里插入图片描述


本文转载自: https://blog.csdn.net/sgx1825192/article/details/139483008
版权归原作者 宋冠巡 所有, 如有侵权,请联系我们删除。

“SpringBoot整合ActiveMQ”的评论:

还没有评论