0


RabbitMQ(集群相关部署)

RabbitMQ

集群部署

环境准备:阿里云centos8 服务器,3台服务器,分别进行安装;

下载Erlang

Erlang

RabbitMQ

版本对照:https://www.rabbitmq.com/which-erlang.html

  1. 创建yum库配置文件
vim /etc/yum.repos.d/rabbitmq.repo
  1. 内容
# 内容 - 来自官方文档:https://www.rabbitmq.com/docs/install-rpm# In /etc/yum.repos.d/rabbitmq.repo#### Zero dependency Erlang RPM##[modern-erlang]name=modern-erlang-el8
# uses a Cloudsmith mirror @ yum.novemberain.com in addition to its Cloudsmith upstream.# Unlike Cloudsmith, the mirror does not have any traffic quotasbaseurl=https://yum1.novemberain.com/erlang/el/8/$basearch
        https://yum2.novemberain.com/erlang/el/8/$basearch
        https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/$basearchrepo_gpgcheck=1enabled=1gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
gpgcheck=1sslverify=1sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300pkg_gpgcheck=1autorefresh=1type=rpm-md

[modern-erlang-noarch]name=modern-erlang-el8-noarch
# uses a Cloudsmith mirror @ yum.novemberain.com.# Unlike Cloudsmith, it does not have any traffic quotasbaseurl=https://yum1.novemberain.com/erlang/el/8/noarch
        https://yum2.novemberain.com/erlang/el/8/noarch
        https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/noarch
repo_gpgcheck=1enabled=1gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
       https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck=1sslverify=1sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300pkg_gpgcheck=1autorefresh=1type=rpm-md

[modern-erlang-source]name=modern-erlang-el8-source
# uses a Cloudsmith mirror @ yum.novemberain.com.# Unlike Cloudsmith, it does not have any traffic quotasbaseurl=https://yum1.novemberain.com/erlang/el/8/SRPMS
        https://yum2.novemberain.com/erlang/el/8/SRPMS
        https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/SRPMS
repo_gpgcheck=1enabled=1gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
       https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck=1sslverify=1sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300pkg_gpgcheck=1autorefresh=1#### RabbitMQ Server##[rabbitmq-el8]name=rabbitmq-el8
baseurl=https://yum2.novemberain.com/rabbitmq/el/8/$basearch
        https://yum1.novemberain.com/rabbitmq/el/8/$basearch
        https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/$basearchrepo_gpgcheck=1enabled=1# Cloudsmith's repository key and RabbitMQ package signing keygpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
       https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck=1sslverify=1sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300pkg_gpgcheck=1autorefresh=1type=rpm-md

[rabbitmq-el8-noarch]name=rabbitmq-el8-noarch
baseurl=https://yum2.novemberain.com/rabbitmq/el/8/noarch
        https://yum1.novemberain.com/rabbitmq/el/8/noarch
        https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/noarch
repo_gpgcheck=1enabled=1# Cloudsmith's repository key and RabbitMQ package signing keygpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
       https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck=1sslverify=1sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300pkg_gpgcheck=1autorefresh=1type=rpm-md

[rabbitmq-el8-source]name=rabbitmq-el8-source
baseurl=https://yum2.novemberain.com/rabbitmq/el/8/SRPMS
        https://yum1.novemberain.com/rabbitmq/el/8/SRPMS
        https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/SRPMS
repo_gpgcheck=1enabled=1gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
gpgcheck=0sslverify=1sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300pkg_gpgcheck=1autorefresh=1type=rpm-md
  1. 更新yum库
# --nobest表示所需安装包即使不是最佳选择也接受
yum update -y--nobest
  1. 进行安装
yum install-y erlang

安装RabbitMQ

Erlang

RabbitMQ

版本对照:https://www.rabbitmq.com/which-erlang.html

# 导入GPG密钥,如失败多重试几次rpm--import'https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc'rpm--import'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key'rpm--import'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key'# 下载 RPM 包wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm

# 安装rpm-ivh rabbitmq-server-3.13.0-1.el8.noarch.rpm

进行配置

# 启用管理界面插件
rabbitmq-plugins enable rabbitmq_management

# 启动 RabbitMQ 服务:
systemctl start rabbitmq-server

# 将 RabbitMQ 服务设置为开机自动启动
systemctl enable rabbitmq-server

# 新增登录账号密码 - 用户名 密码
rabbitmqctl add_user yan 123456# 设置登录账号权限,该用户设置角色是administrator管理员。再设置权限
rabbitmqctl set_user_tags yan administrator
rabbitmqctl set_permissions -p / yan ".*"".*"".*"# 配置所有稳定功能 flag 启用
rabbitmqctl enable_feature_flag all

# 重启RabbitMQ服务生效
systemctl restart rabbitmq-server

# 停止RabbitMQ服务
rabbitmqctl stop_app

首尾配置

# 删除yum库配置文件,避免下次yum安装在执行一遍rm-rf /etc/yum.repos.d/rabbitmq.repo

# 分别在不同服务器修改主机名称vim /etc/hostname
# 分别起名 node1、node2、node3# 例子:机器1,该文件/etc/hostname 下 填写node1#      机器2,该文件/etc/hostname 下 填写node2#      机器3,该文件/etc/hostname 下 填写node3

集群配置

机器1配置:

  1. 分别在3台机器设置IP 地址到对应主机名称的映射修改文件/etc/hosts,追加如下内容:ip1 node01ip2 node02ip3 node03
  2. 查看该MQ的Cookie值并记录cat /var/lib/rabbitmq/.erlang.cookie# 并记录该值,后续机器2,机器3上的cookie值保持一致
  3. 重置节点应用rabbitmqctl stop_apprabbitmqctl resetrabbitmqctl start_app

机器2配置:

  1. 分别在3台机器设置IP 地址到对应主机名称的映射修改文件/etc/hosts,追加如下内容:ip1 node01ip2 node02ip3 node03
  2. 查看该MQ的Cookie值并记录cat /var/lib/rabbitmq/.erlang.cookie# 把机器1上的cookie值复制到这里,与机器1保持一致
  3. 重置节点应用,并加入集群rabbitmqctl stop_apprabbitmqctl reset# 加入集群,去node01机器上找rabbit的程序,进行加入rabbitmqctl join_cluster rabbit@node01rabbitmqctl start_app

机器3配置:

  1. 分别在3台机器设置IP 地址到对应主机名称的映射修改文件/etc/hosts,追加如下内容:ip1 node01ip2 node02ip3 node03
  2. 查看该MQ的Cookie值并记录cat /var/lib/rabbitmq/.erlang.cookie# 把机器1上的cookie值复制到这里,与机器1保持一致
  3. 重置节点应用,并加入集群rabbitmqctl stop_apprabbitmqctl reset# 加入集群,去node01机器上找rabbit的程序,进行加入rabbitmqctl join_cluster rabbit@node01rabbitmqctl start_app
  4. 查看集群状态rabbitmqctl cluster_status# 也可以在图形化界面上,查看集群节点,在Nodes这里查看集群节点

负载均衡

一般MQ的服务端口5672、UI管理界面端口15672

浏览器访问,某个端口,通过HAProxy工具,进行分流每个节点的15672端口上,进行管理UI上的负载均衡;

服务端访问,某个端口,通过HAProxy工具,进行分流每个节点的5672端口上,进行服务端的负载均衡;

安装HAProxy

yum install-y haproxy
# 查看安装的版本
haproxy -v# 启动该程序
systemctl start haproxy
# 设置该服务开机自动启动
systemctl enable haproxy

修改配置文件

  1. 修改配置文件vim /etc/haproxy/haproxy.cfg文件内容:# 在最后一行添加该内容,该部分配置UI管理器上的负载均衡# 起MQ的UI的名字(rabbitmq_ui_frontend),前端部分配置frontend rabbitmq_ui_frontend# 绑定一个地址,端口号,外部浏览器访问该IP地址和端口后,然后通过HAProxy进行转发bind192.168.xxx.xxx:22222# http协议mode http# 默认的后端服务名,使用下面的名字default_backend rabbitmq_ui_backend# 起MQ的服务端的名字(rabbitmq_ui_backend)backend rabbitmq_ui_backend# http协议mode http# 启用轮询的负载均衡balance roundrobinoption httpchk GET /# 分别设置,不同的MQ服务节点的IP地址server rabbitmq_ui1 192.168.xxx.1:15672 checkserver rabbitmq_ui2 192.168.xxx.2:15672 checkserver rabbitmq_ui3 192.168.xxx.3:15672 check# 配置服务端(生产者、消费者)的负载均衡# 服务端(生产者、消费者)的名字frontend rabbitmq_frontend# 绑定一个地址,端口号,服务端(生产者、消费者)访问该IP地址和端口后,然后通过HAProxy进行转发bind192.168.xxx.xxx:11111# tcp协议mode tcpdefault_backend rabbitmq_backend# 与上面的名字保持一致backend rabbitmq_backend# tcp协议mode tcp# 启用轮询的负载均衡balance roundrobin# 分别设置,不同的MQ服务节点的IP地址server rabbitmq1 192.168.xxx.1:5672 checkserver rabbitmq2 192.168.xxx.2:5672 checkserver rabbitmq3 192.168.xxx.3:5672 check
  2. 设置SELinux策略,允许HAProxy拥有权限连接任意端口:# 保存上面的配置文件,执行下面setsebool -Phaproxy_connect_any=1SELinux是Linux系统中的安全模块,它可以限制进程的权限以提高系统的安全性。在某些情况下,SELinux可能会阻止HAProxy绑定指定的端口,这就需要通过设置域(domain)的安全策略来解决此问题。通过执行setsebool -P haproxy_connect_any=1命令,您已经为HAProxy设置了一个布尔值,允许HAProxy连接到任意端口。这样,HAProxy就可以成功绑定指定的socket,并正常工作。
  3. 重启HAProxysystemctl restart haproxy

配置成功

# 访问MQ管理UI页面:
http://192.168.xxx.xxx:22222
# 生产者、消费者服务连接MQ
  rabbitmq:
    host: 192.168.xxx.xxx
    port: 11111
    username: guest
    password: 123456
# 可通过生产者发送消息,监听消费者是否接受到消息

仲裁队列

仲裁队列是3.8版本以后才有的新功能,是主从模式,支持主从数据同步,在添加队列,队列类型为Quorum就是仲裁队列。

创建交换机

与之前创建普通交换机一致

创建仲裁队列
在这里插入图片描述

绑定交换机

与之前的一致

验证,停止某个节点

# 停止rabbit应用
rabbitmqctl stop_app
# 我们在某个节点进行停止MQ,然后生产者发送消息,查看消费者是否可以接受到,判断集群的数据一致性# 默认,主节点挂掉,默认重新选举新的主节点

流式队列

流式队列是3.9版本以后才有的新功能,每个消息都分配一个偏移量,该消息被消费掉也不会被删除,可以重复消费;类似Kafka,但没有达到Kafka的效果。

前提准备 - 安装相关插件

# 启用Stream插件
rabbitmq-plugins enable rabbitmq_stream

# 重启rabbit应用
rabbitmqctl stop_app
rabbitmqctl start_app

# 查看插件状态
rabbitmq-plugins list

负载均衡配置

frontend rabbitmq_stream_frontend
bind192.168.xxx.100:33333
mode tcp
default_backend rabbitmq_stream_backend

backend rabbitmq_stream_backend
mode tcp
balance roundrobin
# 它的端口是5552
server rabbitmq1 192.168.xxx.1:5552 check
server rabbitmq2 192.168.xxx.1:5552 check
server rabbitmq3 192.168.xxx.2:5552 check

JAVA相关代码

// 引包
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>stream-client</artifactId><version>0.15.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency></dependencies>

创建Stream,不需要创建交换机

// 代码创建Environment environment =Environment.builder().host("192.168.xxx.1").port(33333).username("用户名").password("密码").build();

environment.streamCreator().stream("stream.atguigu.test2").create();
environment.close();

通过管理UI页面创建的话,队列类型选择:Stream

生产者

Environment environment =Environment.builder().host("192.168.xxx.1").port(33333).username("用户名").password("密码").build();Producer producer = environment.producerBuilder()// 队列名称.stream("stream.test").build();// 消息内容byte[] messagePayload ="hello rabbit stream".getBytes(StandardCharsets.UTF_8);CountDownLatch countDownLatch =newCountDownLatch(1);
producer.send(
        producer.messageBuilder().addData(messagePayload).build(),
        confirmationStatus ->{if(confirmationStatus.isConfirmed()){System.out.println("[生产者端]the message made it to the broker");}else{System.out.println("[生产者端]the message did not make it to the broker");}
            countDownLatch.countDown();});

countDownLatch.await();
producer.close();
environment.close();

消费者

Environment environment =Environment.builder().host("192.168.xxx.1").port(33333).username("用户名").password("密码").build();
environment.consumerBuilder()// 队列名称.stream("stream.test")// 这个是消费端起名字.name("stream.test.consumer").autoTrackingStrategy().builder().messageHandler((offset, message)->{// 接受消息byte[] bodyAsBinary = message.getBodyAsBinary();String messageContent =newString(bodyAsBinary);System.out.println("[消费者端]messageContent = "+ messageContent +" Offset="+ offset.offset());}).build();

指定偏移量消费

// 指定Offset消费Environment environment =Environment.builder().host("192.168.xxx.1").port(33333).username("用户名").password("密码").build();CountDownLatch countDownLatch =newCountDownLatch(1);Consumer consumer = environment.consumerBuilder().stream("stream.test").offset(OffsetSpecification.first()).messageHandler((offset, message)->{byte[] bodyAsBinary = message.getBodyAsBinary();String messageContent =newString(bodyAsBinary);System.out.println("[消费者端]messageContent = "+ messageContent);
            countDownLatch.countDown();}).build();

countDownLatch.await();
consumer.close();

Federation插件

它是MQ在不同的Broker节点之间进行消息传递而无须建立集群,可以实现跨集群的数据同步。

Federation交换机

  1. 前提准备:启用该插件、分为上游和下游,创建2个独立的MQ服务;# 使用docker进行安装MQ1服务docker run -d\--name rabbitmq-shenzhen \-p51000:5672 \-p52000:15672 \-v rabbitmq-plugin:/plugins \-eRABBITMQ_DEFAULT_USER=guest \-eRABBITMQ_DEFAULT_PASS=123456\rabbitmq:3.13-management# 使用docker进行安装MQ2服务docker run -d\--name rabbitmq-shanghai \-p61000:5672 \-p62000:15672 \-v rabbitmq-plugin:/plugins \-eRABBITMQ_DEFAULT_USER=guest \-eRABBITMQ_DEFAULT_PASS=123456\rabbitmq:3.13-management
  2. 启用插件# 分别在2台MQ服务,进行安装该插件rabbitmq-plugins enable rabbitmq_federationrabbitmq-plugins enable rabbitmq_federation_management安装完插件,在右侧菜单栏可看到
  3. 添加上游连接端点在下游的MQ服务,进行操作,点击右侧Federation Upstreams,进行添加,需要起一个上游的连接点名字,(federation-upstreams),URL填写:amqp://用户名:密码@访问ip:端口 (是上游的可访问的地址)在这里插入图片描述
  4. 创建控制策略进行配置,在下游的MQ服务,进行操作在这里插入图片描述
  5. 验证# 需要满足:# 普通交换机和联邦交换机名称要一致# 交换机名称要能够和策略正则表达式匹配上# 发送消息时,两边使用的路由键也要一致# 队列名称不要求一致在这里插入图片描述 生产者(上游)发送消息、消息会被上游(MQ)的队列接收到,然后下游(MQ)的队列也会接受到消息;满足不同集群进行数据同步;

Federation队列

Federation队列和Federation交换机的最核心区别就是:

  • Federation Police作用在交换机上,就是Federation交换机
  • Federation Police作用在队列上,就是Federation队列
标签: 开发语言 后端

本文转载自: https://blog.csdn.net/yanshengren520/article/details/140247959
版权归原作者 优秀的颜 所有, 如有侵权,请联系我们删除。

“RabbitMQ(集群相关部署)”的评论:

还没有评论