0


RabbitMQ

一、rabbitmq安装

1.1、Docker 安装Rabbitmq

下载镜像,

rabbitmq:management

镜像中已经安装了管理界面

docker pull rabbitmq:management

关闭防火墙

systemctl stop firewalld
systemctl disable firewalld
 
# 重启 docker 系统服务
systemctl restart docker

配置管理员用户名和密码

#mkdir创建文件夹
mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf

# 添加两行配置:
default_user = admin
default_pass = admin

启动Rabbitmq容器

docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
--restart=always \
rabbitmq:management

访问管理控制台 http://192.168.64.140:15672
用户名密码是 admin

设置rabbitmq服务器

# 设置服务,开机自动启动
systemctl enable rabbitmq-server

# 启动服务
systemctl start rabbitmq-server

1.2、rabbitmq管理界面

启用管理界面

# 开启管理界面插件
rabbitmq-plugins enable rabbitmq_management

# 防火墙打开 15672 管理端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload

重启RabbitMQ服务

systemctl restart rabbitmq-server

登录rabbitmq网页

  • 用户管理参考rabbitmq——用户管理 - 孤剑 - 博客园

二、使用rabbitmq消息队列

1.1、前期准备

pom文件添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

.yml配置文件

spring:
  rabbitmq:
    host: 192.168.64.136
    username: admin
    password: admin
    listener:
      simple:
        # 每次收1条,处理完之前不收下一条
        # 默认 250
        prefetch: 1
        # acknowledgeMode: NONE # rabbitmq的自动确认
        # acknowledgeMode: AUTO # rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
        # acknowledgeMode: MANUAL # rabbitmq的手动确认, springboot不发送回执, 必须自己编码发送回执

消息队列与通过交换机生成的消息队列属性

Name:队列的名称
Durable:是否持久化(重启rabbitmq之后,队列是否还存在)
Exclusive:是否只被一个客户端连接使用,且当连接关闭后,删除队列
AutoDelete :是否自动删除(当最后一个消费者退订后即被删除)
Arguments:队列的其他属性参数,有如下可选项:
(1)x-message-ttl:消息的过期时间,单位:毫秒;
(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

交换机的类型

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符 号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还 有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

1.2、创建消息队列,声明交换机

package com.example.demo.entity;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    //声明队列
    @Bean
    public Queue taskQueue() {
        /*
         * 可用以下形式:
         * new Queue("helloworld") - 持久,非排他,非自动删除
         * new Queue("helloworld",false,false,false,null)
         */
        return new Queue("hello",false);
    }

    //声明交换机
    @Bean
    public FanoutExchange logs(){
        return new FanoutExchange("fanout_logs",false,false);
    }
}

1.3、通过队列发送与接受消息

    //注入生产者
    @Autowired
    private SimpleSender simpleSender;

    public String a(String msg){
        simpleSender.a(msg);
        return msg;
    }

1.31、生产者

package com.example.demo.component;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SimpleSender {
    @Autowired
    private AmqpTemplate template;

    public void a(String msg){
        template.convertAndSend("hello",msg);
    }
}

1.32、消费者

package com.example.demo.component;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SimpleReceiver {

    @RabbitListener(queues = "hello")
    public void receive(String msg){
        System.out.println("消费者消费消息"+msg);
    }
}

1.4、通过交换机发送与接收消息

1.41、生产者

package com.example.demo.component;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SimpleSender {
    @Autowired
    private AmqpTemplate template;

    public void a(String msg){
        //路由可以为空,如果有值则需要对应的消费者消费
        //                         交换机    路由 消息
        template.convertAndSend("fanout_logs",k,msg);
    }
}

1.42、消费者

当交换机绑定的队列hello已存在时

package com.example.demo.component;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SimpleReceiver {

    @RabbitListener(queues = "hello")
    public void receive(String msg){
        System.out.println("消费者消费消息"+msg);
    }
}

当不存在与交换机绑定的队列时

注意

@QueueBinding

注解的三个属性:

  • value: @Queue 注解,用于声明队列,value 为 queueName, durable 表示队列是否持久化, autoDelete 表示没有消费者之后队列是否自动删除
  • exchange: @Exchange 注解,用于声明 exchange, type 指定消息投递策略,我们这里用的 topic 方式
  • key: 在 topic 方式下,这个就是我们熟知的 routingKey(路由),若生产者传递的路由为空则可以不写
    /**
     * 队列不存在时,需要创建一个队列,并且与exchange绑定
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "topic.n1", durable = "false", autoDelete = "true"),
            exchange = @Exchange(value = "fanout_logs", type = ExchangeTypes.TOPIC),
            key = "info"))
    public void consumerNoQueue(String data) {
        System.out.println("consumerNoQueue: " + data);
    }

    @RabbitListener(bindings = @QueueBinding(
            //@Queue注解spring会随机创建队列,参数为 非持久、独占、自动删除
            value = @Queue,
            // declare = "false"  不在这里创建交换机,而是用名字引用存在的交换机
            exchange = @Exchange(name = "fanout_logs",declare = "false"),
            key = {"info","warning","error"}
    ))
    public void receive1(String msg){
        System.out.println("消费者1收到:" +msg);
    }

三、rabbitMq延迟消息队列

3.1下载延迟插件

查看镜像的信息获取版本号

docker inspect rabbitmq:management

根据版本号下载延迟插件

下载地址:RabbitMQ

** **

**将插件文件上传到服务器,我这里是直接上传到

/root

下了**

安装插件并启用

将刚刚上传的插件拷贝到容器内plugins目录下

  • rabbit
    
    是容器的
    name
    
    ,也可以使用容器id*
docker cp /root/rabbitmq_delayed_message_exchange-3.9.0.ez rabbit:/plugins

进入到RabbitMQ容器内部

docker exec -it rabbit /bin/bash

查看插件是否存在

cd plugins
ls |grep delay

启用插件

(注意是在plugins内)

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

退出容器,重启RabbitMQ容器

exit
docker restart rabbit

登录RabbitMQ的管理界面(ip:15672 访问web界面)检查x-delayed-message是否存在

Java SpringBoot 代码部分

配置类

package com.example.demo.entity;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

@Configuration
public class Rabbitmq {
    //交换机
    public static final String DELAYED_EXCHANGE ="delayed_exchange";
    //队列
    public static final String DELAYED_QUEUE ="delayed_queue";
    //routeingKey
    public static final String DELAYED_ROUTINGKEY ="delayed_routingKey";

    //声明延迟交换机
    @Bean
    public CustomExchange delayedExchange(){
        HashMap<String, Object> arguments = new HashMap<>();
        //自定义交换机的类型
        arguments.put("x-delayed-type", "direct");
        /**
         * 交换机名
         * 交换机类型
         * 持久化
         * 自动删除
         */
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
    }

    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE);
    }

    //延迟交换机和队列绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(Queue delayedQueue, CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTINGKEY).noargs();
    }
}

生产者

package com.example.demo.controller;

import com.example.demo.entity.Rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;

@Component
@EnableScheduling
public class D {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 
     * @param message       传递的信息
     * @param delayedTime   延迟时间
     */
    public void a(String message,Integer delayedTime){
        rabbitTemplate.convertAndSend(Rabbitmq.DELAYED_EXCHANGE,Rabbitmq.DELAYED_ROUTINGKEY,message,(msg -> {
            //发送消息 并设置delayedTime
            msg.getMessageProperties().setDelay(delayedTime);
            return msg;
        }));
    }
}

消费者

package com.example.demo.controller;

import com.example.demo.entity.Rabbitmq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class C {
    @RabbitListener(queues = Rabbitmq.DELAYED_QUEUE)
    public void receiveMessage(Message message){
        String msg = new String(message.getBody());
        System.out.println(msg);
        System.out.println(new Date());
    }
}

本文转载自: https://blog.csdn.net/cxy_ydj/article/details/125786738
版权归原作者 cxy_ydj 所有, 如有侵权,请联系我们删除。

“RabbitMQ”的评论:

还没有评论