0


docker安装RabbitMQ和延时插件

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)

使用 docker安装 RabbitMQ和延时插件,实现消息延时消费

1、docker 安装 RabbitMQ

docker拉取镜像

docker pull rabbitmq:3.10-management

开启容器

docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 -d rabbitmq:3.10-management

开启容器后,浏览器访问宿主机器 ip+15672端口,访问RabbitMQ管理页面

笔者的宿主机器 ip 是 192.168.5.25

初始账号密码都是guest

2、安装延时插件

下载插件地址:https://www.rabbitmq.com/community-plugins.html

找到 rabbitmq_delayed_message_exchange

点击进入下载页面

找到对应的版本进行下载

下载插件后,将插件上传到服务器

使用 docker 命令将插件复制到容器内部 plugins目录下

docker cp rabbitmq_delayed_message_exchange-3.10.0.ez  rabbitmq:/plugins

进入容器内部进行查看

docker exec -it rabbitmq bash

进入 plugins 目录查看

cd plugins

执行命令安装插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

通过命令查看已安装的插件

rabbitmq-plugins list

退出容器后,重启 rabbitmq 容器

docker restart rabbitmq

重启完成后,进入管理页面,点击交换机 Exchange,点开 Add a new exchange,查看交换机类型 Type,发现里面新增了 x-delayed-message 类型,则延时插件安装成功

3、测试延时消息

新建 SpringBoot 项目

pom.xml内容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.wsjzzcbq</groupId>
    <artifactId>rabbitmq-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.7.RELEASE</version>
                <configuration>
                    <mainClass>com.wsjzzcbq.RabbitmqDemoApplication</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

application.properties内容

# 应用名称
spring.application.name=rabbitmq-demo
# 应用服务 WEB 访问端口
server.port=8080

spring.rabbitmq.addresses=192.168.5.25
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.connection-timeout=5000

spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5

启动类 RabbitmqDemoApplication 内容

package com.wsjzzcbq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitmqDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }

}

RabbitConsumer 消费者内容

package com.wsjzzcbq.rabbit;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * RabbitConsumer
 *
 * @author wsjz
 * @date 2022/10/26
 */
@Component
public class RabbitConsumer {

    /**
     * 注解会自动创建交换机队列及其绑定
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(value="queue-poetry",durable="true"),
            exchange=@Exchange(
                    value="exchange-delayed",
                    arguments = {@Argument(name="x-delayed-type", value = "direct")},
                    type = "x-delayed-message",
                    ignoreDeclarationExceptions="true"),
            key="poetry"
    ))
    public void onMessage(Message<String> message, Channel channel) throws IOException {
        String name = (String) message.getHeaders().get("name");
        System.out.println(name);
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工签收
        channel.basicAck(deliveryTag, false);

        String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        System.out.println("消息已接收" + now + ":" + message.getPayload());
    }
}

RabbitProducer 生产者内容

package com.wsjzzcbq.rabbit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;

/**
 * RabbitProducer
 *
 * @author wsjz
 * @date 2022/10/26
 */
@Component
public class RabbitProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     * @param msg
     * @param properties
     * @param millisecond
     */
    public void sendMessage(Object msg, Map<String, Object> properties, int millisecond) {
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        Message content = MessageBuilder.createMessage(msg, messageHeaders);

        rabbitTemplate.convertAndSend("exchange-delayed", "poetry", content, (message)->{
            message.getMessageProperties().setHeader("x-delay", millisecond);
            return message;
        });

        String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        System.out.println("消息已发送" + now + ":" + msg);
    }

}

DemoController 内容

package com.wsjzzcbq.controller;

import com.wsjzzcbq.rabbit.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;

/**
 * DemoController
 *
 * @author wsjz
 * @date 2022/10/26
 */
@RestController
public class DemoController {

    @Autowired
    private RabbitProducer producer;

    @RequestMapping("/send")
    public String send() {
        Map<String, Object> map = new HashMap<>();
        map.put("name", "花月吟.唐伯虎");
        //消息发送,延时5秒钟
        producer.sendMessage("如此好花如此月, 莫将花月作寻常", map, 5 * 1000);
        return "ok";
    }
}

启动项目,会自动创建交换机和队列,进入管理页面查看交换机 exchange-delayed

运行测试

浏览器访问:http://localhost:8080/send

看控制台打印时间间隔

消费者端延时5秒钟收到消息

至此完


本文转载自: https://blog.csdn.net/wsjzzcbq/article/details/127558183
版权归原作者 悟世君子 所有, 如有侵权,请联系我们删除。

“docker安装RabbitMQ和延时插件”的评论:

还没有评论