0


【RabbitMQ教程】springboot整合rabbitmq(topic模式)

下面还是模拟注册服务当用户注册成功后,向短信和邮件服务推送消息的场景
搭建SpringBoot环境

创建两个工程 mq-rabbitmq-producer和mq-rabbitmq-consumer,分别配置1、2、3(第三步本例消费者用注解形式,可以不用配)

1、添加AMQP的启动器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>

**2、在

application.yml

中添加RabbitMQ的配置: **

server:
  port: 10086  
spring:
  application:
    name: mq-rabbitmq-producer
  rabbitmq:
    host: 192.168.1.103
    port: 5672
    username: kavito
    password: 123456
    virtualHost: /kavito
    template:
      retry:
        enabled: true
        initial-interval: 10000ms
        max-interval: 300000ms
        multiplier: 2
      exchange: topic.exchange
    publisher-confirms: true

属性说明:

属性说明:

  • template:有关AmqpTemplate的配置 - retry:失败重试 - enabled:开启失败重试- initial-interval:第一次重试的间隔时长- max-interval:最长重试间隔,超过这个间隔将不再重试- multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍- exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
  • publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

当然如果consumer只是接收消息而不发送,就不用配置template相关内容。

** 3、定义RabbitConfig配置类,配置Exchange、Queue、及绑定交换机。**

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_EMAIL = "queue_email";//email队列
    public static final String QUEUE_SMS = "queue_sms";//sms队列
    public static final String EXCHANGE_NAME="topic.exchange";//topics类型交换机
    public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
    public static final String ROUTINGKEY_SMS="topic.#.sms.#";
 
    //声明交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        //durable(true) 持久化,mq重启之后交换机还在
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
 
    //声明email队列
    /*
     *   new Queue(QUEUE_EMAIL,true,false,false)
     *   durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
     *   auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
     *   exclusive  表示该消息队列是否只在当前connection生效,默认是false
     */
    @Bean(QUEUE_EMAIL)
    public Queue emailQueue(){
        return new Queue(QUEUE_EMAIL);
    }
    //声明sms队列
    @Bean(QUEUE_SMS)
    public Queue smsQueue(){
        return new Queue(QUEUE_SMS);
    }
 
    //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    @Bean
    public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
                                @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    @Bean
    public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
                              @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
 
}

生产者(mq-rabbitmq-producer)

为了方便测试,我直接把生产者代码放工程测试类:发送routing key是"topic.sms.email"的消息,那么mq-rabbitmq-consumer下那些监听的(与交换机(topic.exchange)绑定,并且订阅的routingkey中匹配了"topic.sms.email"规则的) 队列就会收到消息。

@SpringBootTest
@RunWith(SpringRunner.class)
public class Send {
 
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Test
    public void sendMsgByTopics(){
 
        /**
         * 参数:
         * 1、交换机名称
         * 2、routingKey
         * 3、消息内容
         */
        for (int i=0;i<5;i++){
            String message = "恭喜您,注册成功!userid="+i;
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"topic.sms.email",message);
            System.out.println(" [x] Sent '" + message + "'");
        }
 
    }
}

运行测试类发送5条消息:

20190612112420397.png

web管理界面: 可以看到已经创建了交换机以及queue_email、queue_sms 2个队列,并且向这两个队列分别发送了5条消息

20190612125827992.png

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2thdml0bw==,size_16,color_FFFFFF,t_70

消费者(mq-rabbitmq-consumer)

编写一个监听器组件,通过注解配置消费者队列,以及队列与交换机之间绑定关系。(也可以像生产者那样通过配置类配置)

在SpringAmqp中,对消息的消费者进行了封装和抽象。一个JavaBean的方法,只要添加@RabbitListener注解,就可以成为了一个消费者。

@Component
public class ReceiveHandler {
 
    //监听邮件队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_email", durable = "true"),
            exchange = @Exchange(
                    value = "topic.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"topic.#.email.#","email.*"}))
    public void rece_email(String msg){
        System.out.println(" [邮件服务] received : " + msg + "!");
    }
 
    //监听短信队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_sms", durable = "true"),
            exchange = @Exchange(
                    value = "topic.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"topic.#.sms.#"}))
    public void rece_sms(String msg){
        System.out.println(" [短信服务] received : " + msg + "!");
    }
}

属性说明:

  • @Componet:类上的注解,注册到Spring容器
  • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性: - bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性: - value:这个消费者关联的队列。值是@Queue,代表一个队列- exchange:队列所绑定的交换机,值是@Exchange类型- key:队列和交换机绑定的RoutingKey,可指定多个

启动mq-rabbitmq-comsumer项目

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2thdml0bw==,size_16,color_FFFFFF,t_70

ok,邮件服务和短息服务接收到消息后,就可以各自开展自己的业务了。


本文转载自: https://blog.csdn.net/qq_43783527/article/details/127235589
版权归原作者 @来杯咖啡 所有, 如有侵权,请联系我们删除。

“【RabbitMQ教程】springboot整合rabbitmq(topic模式)”的评论:

还没有评论