RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)
使用 docker安装 RabbitMQ和延时插件,实现消息延时消费
1、docker 安装 RabbitMQ
docker拉取镜像
docker pull rabbitmq:3.10-management
开启容器
docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 -d rabbitmq:3.10-management
开启容器后,浏览器访问宿主机器 ip+15672端口,访问RabbitMQ管理页面
笔者的宿主机器 ip 是 192.168.5.25
初始账号密码都是guest
2、安装延时插件
下载插件地址:https://www.rabbitmq.com/community-plugins.html
找到 rabbitmq_delayed_message_exchange
点击进入下载页面
找到对应的版本进行下载
下载插件后,将插件上传到服务器
使用 docker 命令将插件复制到容器内部 plugins目录下
docker cp rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins
进入容器内部进行查看
docker exec -it rabbitmq bash
进入 plugins 目录查看
cd plugins
执行命令安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
通过命令查看已安装的插件
rabbitmq-plugins list
退出容器后,重启 rabbitmq 容器
docker restart rabbitmq
重启完成后,进入管理页面,点击交换机 Exchange,点开 Add a new exchange,查看交换机类型 Type,发现里面新增了 x-delayed-message 类型,则延时插件安装成功
3、测试延时消息
新建 SpringBoot 项目
pom.xml内容
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wsjzzcbq</groupId>
<artifactId>rabbitmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.wsjzzcbq.RabbitmqDemoApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
application.properties内容
# 应用名称
spring.application.name=rabbitmq-demo
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.addresses=192.168.5.25
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.connection-timeout=5000
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
启动类 RabbitmqDemoApplication 内容
package com.wsjzzcbq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDemoApplication.class, args);
}
}
RabbitConsumer 消费者内容
package com.wsjzzcbq.rabbit;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* RabbitConsumer
*
* @author wsjz
* @date 2022/10/26
*/
@Component
public class RabbitConsumer {
/**
* 注解会自动创建交换机队列及其绑定
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(bindings = @QueueBinding(
value=@Queue(value="queue-poetry",durable="true"),
exchange=@Exchange(
value="exchange-delayed",
arguments = {@Argument(name="x-delayed-type", value = "direct")},
type = "x-delayed-message",
ignoreDeclarationExceptions="true"),
key="poetry"
))
public void onMessage(Message<String> message, Channel channel) throws IOException {
String name = (String) message.getHeaders().get("name");
System.out.println(name);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工签收
channel.basicAck(deliveryTag, false);
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
System.out.println("消息已接收" + now + ":" + message.getPayload());
}
}
RabbitProducer 生产者内容
package com.wsjzzcbq.rabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
/**
* RabbitProducer
*
* @author wsjz
* @date 2022/10/26
*/
@Component
public class RabbitProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param msg
* @param properties
* @param millisecond
*/
public void sendMessage(Object msg, Map<String, Object> properties, int millisecond) {
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message content = MessageBuilder.createMessage(msg, messageHeaders);
rabbitTemplate.convertAndSend("exchange-delayed", "poetry", content, (message)->{
message.getMessageProperties().setHeader("x-delay", millisecond);
return message;
});
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
System.out.println("消息已发送" + now + ":" + msg);
}
}
DemoController 内容
package com.wsjzzcbq.controller;
import com.wsjzzcbq.rabbit.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* DemoController
*
* @author wsjz
* @date 2022/10/26
*/
@RestController
public class DemoController {
@Autowired
private RabbitProducer producer;
@RequestMapping("/send")
public String send() {
Map<String, Object> map = new HashMap<>();
map.put("name", "花月吟.唐伯虎");
//消息发送,延时5秒钟
producer.sendMessage("如此好花如此月, 莫将花月作寻常", map, 5 * 1000);
return "ok";
}
}
启动项目,会自动创建交换机和队列,进入管理页面查看交换机 exchange-delayed
运行测试
浏览器访问:http://localhost:8080/send
看控制台打印时间间隔
消费者端延时5秒钟收到消息
至此完
版权归原作者 悟世君子 所有, 如有侵权,请联系我们删除。