最近处理访客记录所以,来学习下rabbitMQ。之前同事已经写好了,这里只需要进行消费,后续会逐渐完善。
0.介绍
0.1交换机(Exchanges)
rabbitmq中生产者发送的消息都是发送到交换机,再由交换机推入队列。所以生产者不知道队列去了哪里,就靠Exchage来控制,交换机总共有以下几种类型。
0.1.1广播模式(fanout)
扇出所有消息进入队列,类似广播。
0.1.2直接交换(direct)
绑定相关的routerKey分发到不同的队列,简单说就是direct交换机接收了消息后,根据关键词分发队列。
0.1.2主题模式(topic)
direct路由比较单一,所以提升了routerKey的能力,在关键词标记下加上了通配符。
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
1.公共配置类
spring:rabbitmq:host: 127.0.0.1
port:5672username: guest
password: guest
/**
* 类描述:RabbitMQ公共配置类
*
* @ClassName RabbitMQConfig
* @Author ward
* @Date 2023-08-18 10:28
*/publicclassRabbitMQConfig{/**
* RabbitMQ的队列主题名称
*/publicstaticfinalStringRABBITMQ_TOPIC="rabbitmqTopic";/**
* RabbitMQ的DIRECT交换机名称
*/publicstaticfinalStringRABBITMQ_DIRECT_EXCHANGE="rabbitmqDirectExchange";/**
* RabbitMQ的Direct交换机和队列绑定的匹配键 DirectRouting
*/publicstaticfinalStringRABBITMQ_DIRECT_ROUTING="rabbitmqDirectRouting";}
2.消费消息的两种方式
把记录塞进队列里的时候,只是完成了第一步,那你肯定要对他进行消费。分为两种推模式和拉模式:推模式就是生产者发布消息时,主动推送给消费者;拉模式则是消费者发送请求后才会发送。
2.1
3.监听队列的两种方式
一种是@RabbitListener注解的方式,一种是实现springboot:ChannelAwareMessageListener接口的方式
3.1@RabbitListener
如果demoData想不转换成String直接推,得在这个数据流实现序列化。
innerRabbitTemplate.convertAndSend(InnerMQConfig.TOPIC_EXCHANGE, msgKey,JSONObject.toJSONString(demoData));
@ComponentpublicclassDemoRabbitMQListener{//定义方法进行信息的监听(queues表示队列名称)@RabbitListener(queues ="demo_queue")@RabbitHandlerpublicvoiddemoQueue(Message message){System.out.println("message:"+message.getBody());}}
3.2实现ChannelAwareMessageListener接口
听前辈说直接实现这个接口,就不用管底层是谁的消息队列了,因为是基于Springboot,后续我会逐步求证,做需求只能先用着。这个实现起来有点麻烦,我总结了以下顺序:
3.2.1.创建连接工厂(ConnectionFactory——MQ连接工厂 )
publisherConfirms:消息发送到exchange,返回成功或者失败。
publishReturns:消息从exchange到queue,发送成功或者失败。
后续在DemoRabbitTemplate会演示回调
@Bean(name ="DemoConnectionFactory")@PrimarypublicConnectionFactoryconnectionFactory(){//创建连接CachingConnectionFactory connectionFactory =newCachingConnectionFactory();// 主机地址
connectionFactory.setHost(host);// 连接端口;默认为 5672
connectionFactory.setPort(port);// 连接用户名;默认为guest
connectionFactory.setUsername(username);// 连接密码;默认为guest
connectionFactory.setPassword(password);// 虚拟主机名称;默认为 /
connectionFactory.setVirtualHost(virtualHost);// 开启消息发送至RabbitMQ 的回调
connectionFactory.setPublisherConfirms(true);// 开启消息发送至队列失败的回调
connectionFactory.setPublisherReturns(true);return connectionFactory;}
3.2.2.初始化组件(rabbitAdmin ——对MQ进行初始化的Spring组件)
@Bean(name ="DemoRabbitAdmin")@PrimarypublicRabbitAdminrabbitAdmin(@Qualifier("DemoConnectionFactory")ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
3.2.3.创建交换器(exchange)
这里提供了两种等价的方式,喜欢哪种就用哪种。
durable:是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
autoDelete:是否自动删除,如果没有与之绑定的Queue,直接删除
internal:是否内置的,如果为true,只能通过Exchange到Exchange
arguments:结构化参数
看了源码之后发现默认只有名字的时候,其实持久化是开的的,自动删除默认就是关闭的。
/*创建交换器*/@Bean(DEMO_EXCHANGE)publicTopicExchangeexchange(){returnnewTopicExchange(DEMO_EXCHANGE,true,false);}
/*创建交换器*/@Bean(DEMO_EXCHANGE)publicExchangeexchange(){returnExchangeBuilder.topicExchange(DEMO_EXCHANGE).durable(true).build();}
3.2.4.创建队列(queue)
创建队列主要掌握这几个参数:
name: 队列名称。
durable: 队列是否持久化。 队列默认是存放到内存中的,rabbitmq重启则丢失,若想重启之后还存在则队列要持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库。
exclusive:是否排他的队列。有两个作用:连接关闭时该队列自动删除;该队列只允许一个消费者访问。
autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除。
arguments: 队列中的消息什么时候会自动被删除 (设置死信交换器和死信队列等设置)
/*创建*/@Bean(QUEUE_NAME)publicQueueQUEUE_DEMO(){returnnewQueue(QUEUE_NAME,true,false,false);}
3.2.5.绑定队列到交换机(binding)
//绑定队列到交换机@BeanpublicBindingBINGING_EXCHANGE_QUEUE(@Qualifier(QUEUE_NAME)Queue queue,@Qualifier(DEMO_EXCHANGE)Exchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();}
3.2.6.创建监听容器(SimpleMessageListenerContainer)
//创建监听容器@BeanpublicSimpleMessageListenerContainersimpleMessageListenerContainer(@Qualifier("DemoConnectionFactory")ConnectionFactory connectionFactory,DemoRabbitMQListener demoRabbitMQListener,@Qualifier(QUEUE_NAME)Queue queue
)throwsAmqpException{SimpleMessageListenerContainer listenerContainer =newSimpleMessageListenerContainer(connectionFactory);//消费者个数
listenerContainer.setConcurrentConsumers(listenerSize);
listenerContainer.setQueues(queue);
listenerContainer.setExposeListenerChannel(true);//设置接收方式,AUTO-自动接收,MANUAL-手动接收,NULL-不接收
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);//监听处理类(自己消费端写的类)
listenerContainer.setMessageListener(demoRabbitMQListener);return listenerContainer;}
3.2.7.创建操作类(RabbitTemplate)
setConfirmCallback的消息回调是在生产者端要把参数丢进去的。
@Bean(name ="DemoRabbitTemplate")@Primary//多个实现类使用该注解publicRabbitTemplaterabbitTemplate(@Qualifier("DemoConnectionFactory")ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);//触发setReturnCallback回调必须设置mandatory=true,否则Exchange没有找到Queue就会丢弃掉消息,而不会触发回调
rabbitTemplate.setMandatory(true);//设置连接工厂
rabbitTemplate.setConnectionFactory(connectionFactory);//消息是否成功发送到Exchange回调
rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){/**
* 确认消息送到交换机(Exchange)回调
* @param correlationData
* @param ack
* @param cause
*/@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
log.info("确认消息送到交换机(Exchange)结果:");
log.info("相关数据:{}", correlationData);boolean ret =false;if(ack){
log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());//下面可自定义业务逻辑处理,如入库保存信息等}else{
log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);//下面可自定义业务逻辑处理,如入库保存信息等}}});//消息是否从Exchange路由到Queue
rabbitTemplate.setReturnCallback(newRabbitTemplate.ReturnCallback(){/**
* 失败回调:只有消息没有投递给指定的队列
* @param message 投递失败的消息详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 当时这个消息发给那个交换机
* @param routingKey 当时这个消息用那个路由键
*/@OverridepublicvoidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){//获取消息idString messageId = message.getMessageProperties().getMessageId();// 内容String result =null;try{
result =newString(message.getBody(),"UTF-8");}catch(Exception e){
log.error("消息发送失败", e);}
log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);//下面可自定义业务逻辑处理,如入库保存信息等}});return rabbitTemplate;}
3.2.8.监听消费(RabbitMQListener)
这个类要注意用@Service或者@Compet注解让他交给IOC
@Service@Slf4jpublicclassDemoRabbitMQListenerimplementsChannelAwareMessageListener{@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{
log.info("message:{}", message.getBody());//todo: 接下来就是各自的业务逻辑,就是消费环节}}
3.子标题
正文
在这里插入代码片
4.子标题
正文
在这里插入代码片
5.子标题
正文
在这里插入代码片
版权归原作者 yeapT 所有, 如有侵权,请联系我们删除。