下面还是模拟注册服务当用户注册成功后,向短信和邮件服务推送消息的场景
搭建SpringBoot环境
创建两个工程 mq-rabbitmq-producer和mq-rabbitmq-consumer,分别配置1、2、3(第三步本例消费者用注解形式,可以不用配)
1、添加AMQP的启动器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
**2、在
application.yml
中添加RabbitMQ的配置: **
server:
port: 10086
spring:
application:
name: mq-rabbitmq-producer
rabbitmq:
host: 192.168.1.103
port: 5672
username: kavito
password: 123456
virtualHost: /kavito
template:
retry:
enabled: true
initial-interval: 10000ms
max-interval: 300000ms
multiplier: 2
exchange: topic.exchange
publisher-confirms: true
属性说明:
属性说明:
- template:有关AmqpTemplate的配置 - retry:失败重试 - enabled:开启失败重试- initial-interval:第一次重试的间隔时长- max-interval:最长重试间隔,超过这个间隔将不再重试- multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍- exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
- publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
当然如果consumer只是接收消息而不发送,就不用配置template相关内容。
** 3、定义RabbitConfig配置类,配置Exchange、Queue、及绑定交换机。**
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_EMAIL = "queue_email";//email队列
public static final String QUEUE_SMS = "queue_sms";//sms队列
public static final String EXCHANGE_NAME="topic.exchange";//topics类型交换机
public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
public static final String ROUTINGKEY_SMS="topic.#.sms.#";
//声明交换机
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//声明email队列
/*
* new Queue(QUEUE_EMAIL,true,false,false)
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
@Bean(QUEUE_EMAIL)
public Queue emailQueue(){
return new Queue(QUEUE_EMAIL);
}
//声明sms队列
@Bean(QUEUE_SMS)
public Queue smsQueue(){
return new Queue(QUEUE_SMS);
}
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
@Bean
public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机,指定routingKey
@Bean
public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
生产者(mq-rabbitmq-producer)
为了方便测试,我直接把生产者代码放工程测试类:发送routing key是"topic.sms.email"的消息,那么mq-rabbitmq-consumer下那些监听的(与交换机(topic.exchange)绑定,并且订阅的routingkey中匹配了"topic.sms.email"规则的) 队列就会收到消息。
@SpringBootTest
@RunWith(SpringRunner.class)
public class Send {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMsgByTopics(){
/**
* 参数:
* 1、交换机名称
* 2、routingKey
* 3、消息内容
*/
for (int i=0;i<5;i++){
String message = "恭喜您,注册成功!userid="+i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"topic.sms.email",message);
System.out.println(" [x] Sent '" + message + "'");
}
}
}
运行测试类发送5条消息:
web管理界面: 可以看到已经创建了交换机以及queue_email、queue_sms 2个队列,并且向这两个队列分别发送了5条消息
消费者(mq-rabbitmq-consumer)
编写一个监听器组件,通过注解配置消费者队列,以及队列与交换机之间绑定关系。(也可以像生产者那样通过配置类配置)
在SpringAmqp中,对消息的消费者进行了封装和抽象。一个JavaBean的方法,只要添加@RabbitListener注解,就可以成为了一个消费者。
@Component
public class ReceiveHandler {
//监听邮件队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_email", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.email.#","email.*"}))
public void rece_email(String msg){
System.out.println(" [邮件服务] received : " + msg + "!");
}
//监听短信队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_sms", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.sms.#"}))
public void rece_sms(String msg){
System.out.println(" [短信服务] received : " + msg + "!");
}
}
属性说明:
- @Componet:类上的注解,注册到Spring容器
- @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性: - bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性: - value:这个消费者关联的队列。值是@Queue,代表一个队列- exchange:队列所绑定的交换机,值是@Exchange类型- key:队列和交换机绑定的RoutingKey,可指定多个
启动mq-rabbitmq-comsumer项目
ok,邮件服务和短息服务接收到消息后,就可以各自开展自己的业务了。
版权归原作者 @来杯咖啡 所有, 如有侵权,请联系我们删除。