一、rabbitmq安装
1.1、Docker 安装Rabbitmq
下载镜像,
rabbitmq:management
镜像中已经安装了管理界面
docker pull rabbitmq:management
关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
# 重启 docker 系统服务
systemctl restart docker
配置管理员用户名和密码
#mkdir创建文件夹
mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf
# 添加两行配置:
default_user = admin
default_pass = admin
启动Rabbitmq容器
docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
--restart=always \
rabbitmq:management
访问管理控制台 http://192.168.64.140:15672
用户名密码是 admin
设置rabbitmq服务器
# 设置服务,开机自动启动
systemctl enable rabbitmq-server
# 启动服务
systemctl start rabbitmq-server
1.2、rabbitmq管理界面
启用管理界面
# 开启管理界面插件
rabbitmq-plugins enable rabbitmq_management
# 防火墙打开 15672 管理端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
重启RabbitMQ服务
systemctl restart rabbitmq-server
登录rabbitmq网页
- 用户管理参考rabbitmq——用户管理 - 孤剑 - 博客园
二、使用rabbitmq消息队列
1.1、前期准备
pom文件添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
.yml配置文件
spring:
rabbitmq:
host: 192.168.64.136
username: admin
password: admin
listener:
simple:
# 每次收1条,处理完之前不收下一条
# 默认 250
prefetch: 1
# acknowledgeMode: NONE # rabbitmq的自动确认
# acknowledgeMode: AUTO # rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
# acknowledgeMode: MANUAL # rabbitmq的手动确认, springboot不发送回执, 必须自己编码发送回执
消息队列与通过交换机生成的消息队列属性
Name:队列的名称
Durable:是否持久化(重启rabbitmq之后,队列是否还存在)
Exclusive:是否只被一个客户端连接使用,且当连接关闭后,删除队列
AutoDelete :是否自动删除(当最后一个消费者退订后即被删除)
Arguments:队列的其他属性参数,有如下可选项:
(1)x-message-ttl:消息的过期时间,单位:毫秒;
(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
交换机的类型
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符 号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还 有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
1.2、创建消息队列,声明交换机
package com.example.demo.entity;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
//声明队列
@Bean
public Queue taskQueue() {
/*
* 可用以下形式:
* new Queue("helloworld") - 持久,非排他,非自动删除
* new Queue("helloworld",false,false,false,null)
*/
return new Queue("hello",false);
}
//声明交换机
@Bean
public FanoutExchange logs(){
return new FanoutExchange("fanout_logs",false,false);
}
}
1.3、通过队列发送与接受消息
//注入生产者
@Autowired
private SimpleSender simpleSender;
public String a(String msg){
simpleSender.a(msg);
return msg;
}
1.31、生产者
package com.example.demo.component;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SimpleSender {
@Autowired
private AmqpTemplate template;
public void a(String msg){
template.convertAndSend("hello",msg);
}
}
1.32、消费者
package com.example.demo.component;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SimpleReceiver {
@RabbitListener(queues = "hello")
public void receive(String msg){
System.out.println("消费者消费消息"+msg);
}
}
1.4、通过交换机发送与接收消息
1.41、生产者
package com.example.demo.component;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SimpleSender {
@Autowired
private AmqpTemplate template;
public void a(String msg){
//路由可以为空,如果有值则需要对应的消费者消费
// 交换机 路由 消息
template.convertAndSend("fanout_logs",k,msg);
}
}
1.42、消费者
当交换机绑定的队列hello已存在时
package com.example.demo.component;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SimpleReceiver {
@RabbitListener(queues = "hello")
public void receive(String msg){
System.out.println("消费者消费消息"+msg);
}
}
当不存在与交换机绑定的队列时
注意
@QueueBinding
注解的三个属性:
- value: @Queue 注解,用于声明队列,value 为 queueName, durable 表示队列是否持久化, autoDelete 表示没有消费者之后队列是否自动删除
- exchange: @Exchange 注解,用于声明 exchange, type 指定消息投递策略,我们这里用的 topic 方式
- key: 在 topic 方式下,这个就是我们熟知的 routingKey(路由),若生产者传递的路由为空则可以不写
/**
* 队列不存在时,需要创建一个队列,并且与exchange绑定
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.n1", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "fanout_logs", type = ExchangeTypes.TOPIC),
key = "info"))
public void consumerNoQueue(String data) {
System.out.println("consumerNoQueue: " + data);
}
@RabbitListener(bindings = @QueueBinding(
//@Queue注解spring会随机创建队列,参数为 非持久、独占、自动删除
value = @Queue,
// declare = "false" 不在这里创建交换机,而是用名字引用存在的交换机
exchange = @Exchange(name = "fanout_logs",declare = "false"),
key = {"info","warning","error"}
))
public void receive1(String msg){
System.out.println("消费者1收到:" +msg);
}
三、rabbitMq延迟消息队列
3.1下载延迟插件
查看镜像的信息获取版本号
docker inspect rabbitmq:management
根据版本号下载延迟插件
下载地址:RabbitMQ
** **
**将插件文件上传到服务器,我这里是直接上传到
/root
下了**
安装插件并启用
将刚刚上传的插件拷贝到容器内plugins目录下
是容器的rabbit
,也可以使用容器id*name
docker cp /root/rabbitmq_delayed_message_exchange-3.9.0.ez rabbit:/plugins
进入到RabbitMQ容器内部
docker exec -it rabbit /bin/bash
查看插件是否存在
cd plugins
ls |grep delay
启用插件
(注意是在plugins内)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器,重启RabbitMQ容器
exit
docker restart rabbit
登录RabbitMQ的管理界面(ip:15672 访问web界面)检查x-delayed-message是否存在
Java SpringBoot 代码部分
配置类
package com.example.demo.entity;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class Rabbitmq {
//交换机
public static final String DELAYED_EXCHANGE ="delayed_exchange";
//队列
public static final String DELAYED_QUEUE ="delayed_queue";
//routeingKey
public static final String DELAYED_ROUTINGKEY ="delayed_routingKey";
//声明延迟交换机
@Bean
public CustomExchange delayedExchange(){
HashMap<String, Object> arguments = new HashMap<>();
//自定义交换机的类型
arguments.put("x-delayed-type", "direct");
/**
* 交换机名
* 交换机类型
* 持久化
* 自动删除
*/
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
}
/**
* 声明队列
* @return
*/
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE);
}
//延迟交换机和队列绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(Queue delayedQueue, CustomExchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTINGKEY).noargs();
}
}
生产者
package com.example.demo.controller;
import com.example.demo.entity.Rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class D {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*
* @param message 传递的信息
* @param delayedTime 延迟时间
*/
public void a(String message,Integer delayedTime){
rabbitTemplate.convertAndSend(Rabbitmq.DELAYED_EXCHANGE,Rabbitmq.DELAYED_ROUTINGKEY,message,(msg -> {
//发送消息 并设置delayedTime
msg.getMessageProperties().setDelay(delayedTime);
return msg;
}));
}
}
消费者
package com.example.demo.controller;
import com.example.demo.entity.Rabbitmq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class C {
@RabbitListener(queues = Rabbitmq.DELAYED_QUEUE)
public void receiveMessage(Message message){
String msg = new String(message.getBody());
System.out.println(msg);
System.out.println(new Date());
}
}
版权归原作者 cxy_ydj 所有, 如有侵权,请联系我们删除。