0


RabbitMQ常用Exchange详解

1.Exchange 介绍

在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费

1.2 路由键(RoutingKey)

生产者将消息发送给交换机的时候,会指定RoutingKey指定路由规则。

1.3 绑定键(BindingKey)

通过绑定键将交换机与队列关联起来,这样RabbitMQ就知道如何正确消息路由到队列。

小结:

生产者将消息发送给哪个E ×change是需要由RoutingKey决定的,生产者需要将E× change与哪个队列绑定时需要由BindingKey决定的

2. 直连交换机:Direct exchange

直连交换机的路由算法非常简单:将消息准送到binding key与该消息的routing key相同的队列

直连交换机x上绑定了两个队列。第一个队列绑定了绑定键orange,第二个队列有两个绑定踺:black和greeno在这种场景下,一个消息在指定了路由键为orange将会只被路由到队列QI,路由键为black和greeno的消息都将被路由到队列 Q2。其他的消息都将被丢失。

示例:

  • provider
package com.wyy.provider;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectQueueConfig {

    /**
     * 创建直连交换机队列
     * @return
     */
    @Bean
    public Queue directQueue(){
        return new Queue("DirectQueue",true);
    }

    /**
     * 创建直连交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    /**
     * 进行队列与交换机绑定
     * @return
     */
    @Bean
    public Binding directBindingA(){
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
    }
}
package com.wyy.provider.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class DirectController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/senderDirect/{msg}")
    public Object senderDirect( @PathVariable String msg){
        rabbitTemplate.convertAndSend("directExchange",msg,"??????");
        return "yes";
    }
}
  • ** consumer**
package com.wyy.consumer.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
@RabbitListener(queues = "DirectQueue")
public class DirectController {

    @RabbitHandler
    public void process(String msg){
        log.warn("DirectQueue接受:"+msg);
    }
}

3. 主题交换机:Topic

发送到主题交换机的消息不能有任意的routing key,必须是由点号分开的一串单词,这些单词可以是任意的,但通常是与消息相关的一些特

比如以下是几个有效的routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",routing key的单词可以有很多,最大限制是 255 byteso

Topic交换机的逻辑与direct交换机有点相似,使用特定路由踺发送的消息将被发送到所有使用匹配绑定键绑定的队列,然而,绑定键有两个特殊的情况:

    • 表示匹配任意一个单词
  • 表示匹配任意一个或多个单词

示例:

  • provider
package com.wyy.provider;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicQueueConfig {
    public static String KEY_A ="*.wyy.*";
    public static String KEY_B ="*.*.wyy";
    public static String KEY_C ="wyy.#";

    /**
     * 创建主题交换机队列
     * @return
     */
    @Bean
    public Queue topicQueue(){
        return new Queue("TopicQueue",true);
    }

    /**
     * 创建主题交换机
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    /**
     * 进行队列与交换机绑定1
     * @return
     */
    @Bean
    public Binding topicBindingA(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(KEY_A);
    }

    /**
     * 进行队列与交换机绑定2
     * @return
     */
    @Bean
    public Binding topicBindingB(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(KEY_B);
    }

    /**
     * 进行队列与交换机绑定3
     * @return
     */
    @Bean
    public Binding topicBindingC(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(KEY_C);
    }
}
package com.wyy.provider.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TopicController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/senderTopic/{msg}")
    public Object senderDirect( @PathVariable String msg){
        rabbitTemplate.convertAndSend("topicExchange",msg,"~~~~~~~");
        return "yes";
    }
}
  • consumer
package com.wyy.consumer.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
@RabbitListener(queues = "TopicQueue")
public class TopicController {

    @RabbitHandler
    public void process(String msg){
        log.warn("TopicQueue接受:"+msg);
    }
}

4. 扇形交换机:Fanout exchange

扇形交换机是最基本的交换机类型,它所能做的事清非常简单广播消息。

扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要''思考",所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

示例:

  • provider
package com.wyy.provider;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutQueueConfig {

    /**
     * 创建扇形交换机队列
     * @return
     */
    @Bean
    public Queue fanoutQueue(){
        return new Queue("fanoutQueue",true);
    }

    /**
     * 创建扇形交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 进行队列与交换机绑定
     * @return
     */
    @Bean
    public Binding fanoutBindingA(){
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }
}
package com.wyy.provider.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class FanoutController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/senderFanout")
    public Object senderFanout(){
        rabbitTemplate.convertAndSend("fanoutExchange",null,"#######");
        return "yes";
    }
}
  • consumer
package com.wyy.consumer.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
@RabbitListener(queues = "fanoutQueue")
public class FanoutController {

    @RabbitHandler
    public void process(String msg){
        log.warn("fanoutQueue接受:"+msg);
    }
}
标签: rabbitmq java 分布式

本文转载自: https://blog.csdn.net/weixin_58670730/article/details/123138724
版权归原作者 邂逅于晚风 所有, 如有侵权,请联系我们删除。

“RabbitMQ常用Exchange详解”的评论:

还没有评论