0


RabbitMq概述与工作模式(1)

rabbitMq深入浅出

一,Mq概念

1.1 Mq概述

Mq全称 Message Queue,是在消息传输过程中保存消息的容器,多用于分布式系统之间的通信
在这里插入图片描述

1.2 Mq优势

1,应用解耦

如下图,假设a和bcde系统都要实现交互,在不使用mq的情况下,那么就会增加系统之间的耦合性,并且增加系统之间的开销,在使用mq的情况下,a只需将消息发到队列中去即可,bcde直接去mq中取就可以了。
在这里插入图片描述
2,异步提速

如下图,在没使用mq的情况下,用户在点击按钮之后,需要同步按顺序做这些事情,5ms查库存,减库存等,300毫秒得到响应,再5ms调用支付系统,以此类推大概需要 20+300+300+300 = 920ms;但是如果使用异步的话,只需要 20+5 = 25ms,可以大大的提升用户体验以及系统吞吐量。
在这里插入图片描述
3,削峰填谷

有点类似于限流,在消息高峰的时候,将消息的消费速度进行限制,从而使消息堆积在Mq里面,由于消息长时间堆积,而消费速度进行了限制,所以会有一段长时间的平谷期。从而实现提升系统的稳定性。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rLibUvVc-1653327760677)(C:\Users\HULOUBO\AppData\Roaming\Typora\typora-user-images\1653317195811.png)]

1.3 Mq的劣势

1,系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响
2,MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用

二,RabbitMQ 简介

官网:https://www.rabbitmq.com/getstarted.html

2.1 mq的基本组成组件如下

在这里插入图片描述
Publisher:生产者,即消息的发送者,用于对消息的发送
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:直连direct (point-to-point), Topic通配符模式 和 Fanout广播模式,但是交换机并没有持久化的能力,只负责消息的转发,所以如果使用可交换机而并没有绑定队列的情况下,会造成数据的丢失。
Queue:消息最终被送到这里等待 consumer 取走
Consumer:消费者,用于去队列中获取消息,从而实现对消息的消费

2.2 Mq基础架构图如下

在这里插入图片描述
Broker:用于接收和分发消息的应用,里面主要包含虚拟机,每个虚拟机可以包含多个交换机和队列
Virtual host:存储在Broker中,每个虚拟机可以包含多个交换机和队列
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:由于TCP的建立连接需要进行大量的可靠连接以及TCP的创建和销毁,会浪费很多系统资源,从而降低系统的吞吐量,因此采用信道的方式建立连接,从而减少了大量的TCP可靠连接的开销

三,rabbitmq的安装

3.1 使用docker安装

1,下载rabbitMq镜像

docker pull rabbitmq:management

2,后台方式运行:

docker run --name rabbitmq -d -p 15672:15672-p 5672:5672 rabbitmq:management

3,OK了,这样rabbitmq3.7.3-manager就安装好了,一定要把manager加上,不然图形化界面出现不了
在服务器上开放5672和15672端口
在这里插入图片描述
4,可以查看一下正在运行的容器

docker ps

在这里插入图片描述4,测试连接,可以发现是没问题的。(如果有,那就关闭一下防火墙或者看看端口是否开放)

curl 0.0.0.0:5672

浏览器测试:服务器ip:15672
在这里插入图片描述
默认的账号密码都是guest,输入到达以下界面,那么rabbitmq就成功安装了
在这里插入图片描述

3.2 图形化界面相关参数讲解

在这里插入图片描述
Users
可以增加用户,并且为用户设置权限,该用户在登录之后就可以查看到对应创建的mq的信息,如可以看到对应的虚拟机,交换机和队列等信息
Virtual host
相对于mysql来说,这个虚拟机就是对应的数据库,因此在建表的时候,就需要提前建立数据库,这个mq和mysql一样,在建立交换机或者队列之前需要先建立虚拟机,如下图,可以看该虚拟机的相关信息,点击这个Name也可以为这个虚拟机设置哪个用户可以使用
在这里插入图片描述
Exchange
交换机,用于分发消息,在建表的时候需要先建立数据库,因此在建立交换机的时候就需要先建立虚拟机,在建立交换机的时候需要选择具体的虚拟机,并且类型有direct直连模式,topic发布订阅模式和fanout模式。
在这里插入图片描述
Queue
队列,用于存储发送的消息,也需要绑定虚拟机,同时也可以绑定对应的交换机,如下图,可以发现具体绑定了的虚拟机名字,也可以通过 total 查看消息未被消费的个数等。
在这里插入图片描述

四,工作机制的初步描述以及代码实现

如下图,官网以及很明确的告诉这六种使用方式了,接下来主要描述这前五种的方式,通过代码以及具体的案例,使用原生的代码方式来实现
在这里插入图片描述

4.1 公共代码模块

首先需要创建一个用户或者使用默认用户guest,其次的话需要创建一个虚拟机study_mq,相当于建一个数据库一样,队列可以暂时不建立,系统会自动建立

4.1.1,rabbitmq连接工具类

packagecom.zhs.rabbitmq.utils;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassRabbitUtils{//连接工厂privatestaticConnectionFactory connectionFactory =newConnectionFactory();static{//设置主机
        connectionFactory.setHost("*.*.*.*");//端口号
        connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号//用户名
        connectionFactory.setUsername("123456");//密码
        connectionFactory.setPassword("123456");//设置虚拟机,相当于数据库
        connectionFactory.setVirtualHost("/study_mq");}publicstaticConnectiongetConnection(){Connection conn =null;try{//获取长连接
            conn = connectionFactory.newConnection();return conn;}catch(Exception e){thrownewRuntimeException(e);}}}

4.1.2,参数公共类

packagecom.zhs.rabbitmq.utils;/**
 * 交换机需要手动创建,否者回报异常,队列名称建议手建
 * @author zhenghuisheng
 */publicclassRabbitConstant{//队列名称publicstaticfinalString QUEUE_HELLOWORLD ="zhs_study_mq";publicstaticfinalString QUEUE_SMS ="sms";publicstaticfinalString QUEUE_BAIDU ="baidu";publicstaticfinalString QUEUE_SINA ="sina";//交换机名称publicstaticfinalString EXCHANGE_WEATHER ="weather";publicstaticfinalString EXCHANGE_WEATHER_ROUTING ="weather_routing";publicstaticfinalString EXCHANGE_WEATHER_TOPIC ="weather_topic";}

4.2 hello wrold模式

即简单的工作队列模式,首先需要手动的创建队列,不创建的话会自动创建。主要是实现简单的工作队列模式,实现一对一的方式,生产者发布消息到队列,消费者去队列中获取消息,期间也不需要显示的设置交换机,底层会默认选择使用交换机

4.2.1 消费者

packagecom.zhs.rabbitmq.helloworld;-importcom.zhs.rabbitmq.utils.RabbitConstant;importcom.zhs.rabbitmq.utils.RabbitUtils;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * @author zhenghuisheng
 */publicclassConsumer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//获取TCP长连接Connection conn =RabbitUtils.getConnection();//创建通信“通道”,相当于TCP中的虚拟连接Channel channel = conn.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null);//从MQ服务器中获取数据,即消费数据//创建一个消息消费者//第一个参数:队列名//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法,防止mq出现异常//第三个参数要传入DefaultConsumer的实现类
        channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD,false,newReciver(channel));}}classReciverextendsDefaultConsumer{privateChannel channel;//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到publicReciver(Channel channel){super(channel);this.channel = channel;}@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//body为获取的信息String message =newString(body);System.out.println("消费者接收到的消息:"+ message);System.out.println("消息的TagId:"+ envelope.getDeliveryTag());//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(),false);}}

4.2.2 生产者

packagecom.zhs.rabbitmq.helloworld;importcom.zhs.rabbitmq.utils.RabbitConstant;importcom.zhs.rabbitmq.utils.RabbitUtils;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.Random;importjava.util.concurrent.TimeoutException;/**
 * @author zhenghuisheng
 */publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//获取TCP长连接Connection conn =RabbitUtils.getConnection();//创建通信“通道”,相当于TCP中的虚拟连接,即获取通道连接Channel channel = conn.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null);//四个参数//exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到//队列名称//额外的设置属性//最后一个参数是要传递的消息字节数组for(int i =0; i <50; i++){//需要发送的数据,发送十条消息String message =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate())+": hello_mq "+newRandom().nextInt(1000);
            channel.basicPublish("",RabbitConstant.QUEUE_HELLOWORLD,null,message.getBytes());}
        channel.close();
        conn.close();System.out.println("===生产者数据发送成功===");}}

4.3 工作队列模式

就比如12306,在春节高并发期间,会使用大量的短信服务器为用户发送消息购买成功或者失败的消息,由于期间并发量特别高,所以可能会选择使用mq将消息发送到mq里面,然后使用服务器客户端去mq中拉取消息,实现消息再发送到客户手中,客户端不够的话可以增加客户端,去mq中拉取消息就行了,从而实现分摊各个服务器的压力,实现方式如下

4.3.1 消息实体类

packagecom.zhs.rabbitmq.workqueue;importlombok.Data;/**
 * @author zhenghuisheng
 */@Data@AllArgsConstructor@NoArgsConstructorpublicclass SMS {//名字privateString name;//电话号码privateString mobile;//消息privateString content;}

4.3.1 消息生产者

/**
 * @author zhenghuisheng
 * 工作队列模式
 * 发送者
 */publicclassOrderSystem{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//获取mq连接Connection connection =RabbitUtils.getConnection();Channel channel = connection.createChannel();//队列名称
        channel.queueDeclare(RabbitConstant.QUEUE_SMS,false,false,false,null);for(int i =1; i <=100; i++){SMS sms =newSMS("乘客"+ i,"13900000"+ i,"您的车票已预订成功");String jsonSMS =newGson().toJson(sms);//交换机,队列
            channel.basicPublish("",RabbitConstant.QUEUE_SMS ,null, jsonSMS.getBytes());}System.out.println("发送数据成功");
        channel.close();
        connection.close();}}

4.3.3 消息接收者1号

publicclassSMSSender1{publicstaticvoidmain(String[] args)throwsIOException{//获取连接Connection connection =RabbitUtils.getConnection();//构建通道finalChannel channel = connection.createChannel();//队列名称,是否持久化,是否私有化,是否自动删除
        channel.queueDeclare(RabbitConstant.QUEUE_SMS,false,false,false,null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个,没消费完不要去取下一个
        channel.basicConsume(RabbitConstant.QUEUE_SMS ,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String jsonSMS =newString(body);System.out.println("SMSSender1-短信发送成功:"+ jsonSMS);try{Thread.sleep(10);}catch(InterruptedException e){
                    e.printStackTrace();}
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

4.4.4 消息接收者2号

publicclassSMSSender2{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitUtils.getConnection();finalChannel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS,false,false,false,null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者,即使用轮询的方式//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个
        channel.basicConsume(RabbitConstant.QUEUE_SMS ,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String jsonSMS =newString(body);System.out.println("SMSSender2-短信发送成功:"+ jsonSMS);try{//阻塞,根据服务器的性能设置Thread.sleep(100);}catch(InterruptedException e){
                    e.printStackTrace();}
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

4.4 发布订阅模式

和工作队列的方式不一样,发送者先将数据发送到交换机里面,然后将数据发送到队列中,每个队列都要发送一样的数据,没有说实现分摊压力的,这样才能保证每个服务都能发送一样的数据,从而实现每个用户都能接收到相同的数据。就像天气预报一样,天气预报发完之后,就会通知传到各个网站上,如百度和新浪都会同时接收到总台发送的消息。

4.4.1 总台发布消息

/**
 * @author zhenghuisheng
 * 发布者发布消息
 */publicclassWeatherBureau{publicstaticvoidmain(String[] args)throwsException{//获取TCP长连接Connection connection =RabbitUtils.getConnection();//键盘手动输入信息String input =newBufferedReader(newInputStreamReader(System.in)).readLine();//通过连接获取通道Channel channel = connection.createChannel();//第一个参数交换机名字   其他参数和之前的一样
        channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"",null, input.getBytes());
        channel.close();
        connection.close();}}

4.4.2 百度接收消息

publicclassBiaDu{publicstaticvoidmain(String[] args)throwsIOException{//获取TCP长连接Connection connection =RabbitUtils.getConnection();//获取虚拟连接finalChannel channel = connection.createChannel();//声明队列信息
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false,false,false,null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER,"");//确认一个消息之后再拿下一个消息
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU ,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("百度天气收到气象信息:"+newString(body));
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

4.4.3 新浪接收消息

publicclassSina{publicstaticvoidmain(String[] args)throwsIOException{//获取TCP长连接Connection connection =RabbitUtils.getConnection();//获取虚拟连接finalChannel channel = connection.createChannel();//声明队列信息,是否持久化,是否私有化,是否自动删除
        channel.queueDeclare(RabbitConstant.QUEUE_SINA,false,false,false,null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由key
        channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER,"");//每次消费一条再去取下一条消息
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA ,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("新浪天气收到气象信息:"+newString(body));
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

4.5 route路由模式

通过上面的发布订阅模式可知,每个队列都会接收到一样的消息,但是要解决每个队列接收不同的消息问题,还是需要用到这个路由模式的,如发送者想把不同的消息发送到不同的队列里面,从而发送到不同的客户端。这样的话就可以为每个消息设置唯一标识,发送的时候携带一个唯一标识,接收者接收这个唯一标识,即可以用到这个route key来实现。消息发送者在发送到交换机的时候会在消息上携带一个routing key,在交换机将发送到队列时,需要队列也指定一个routing key,这样就能实现不同的数据发送到不同的队列,从而实现不同消息发送给不同的服务接收端。如改变上面的发布订阅模式,百度的信息发给百度,新浪的信息发给新浪,这样就不会每个队列都会接收到全部消息,而是可以接收到队列中想接收到的信息。

4.5.1 天气服务端发送信息

publicclassWeatherBureau{publicstaticvoidmain(String[] args)throwsException{Map<String,String> map =newHashMap<String,String>();
        map.put("china.jiangxi.ganzhou.20220403","中国江西赣州20220403天气数据");
        map.put("china.jiangxi.nanchang.20220403","中国江西南昌20220403天气数据");
        map.put("china.jiangxi.jiujiang.20220404","中国江西九江20220404天气数据");
        map.put("china.beijin.20220403","中国北京20220403天气数据");
        
        map.put("china.guangdong.shenzhen.20220404","中国广东深圳20220404天气数据");
        map.put("china.guangdong.guangzhou.20220403","中国广东广州20220403天气数据");
        map.put("china.guangdong.dongguan.20220404","中国广东东莞20220404天气数据");
        map.put("china.beijin.20220404","中国北京20220404天气数据");Connection connection =RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String,String>> itr = map.entrySet().iterator();while(itr.hasNext()){Map.Entry<String,String> me = itr.next();//第一个参数交换机名字   第二个参数作为 消息的routing key
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey(),null, me.getValue().getBytes());}
        channel.close();
        connection.close();}}

4.5.2 百度客户端接收消息

publicclassBiaDu{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitUtils.getConnection();finalChannel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false,false,false,null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名  参数三:路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.beijin.20220404");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.jiangxi.jiujiang.20220404");//确认签收一条消息之后再去拿消息,默认为轮询机制
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU ,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("百度天气收到气象信息:"+newString(body));
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

4.2.1 新浪客户端接收消息

publicclassSina{publicstaticvoidmain(String[] args)throwsIOException{//获取TCP长连接Connection connection =RabbitUtils.getConnection();//获取虚拟连接finalChannel channel = connection.createChannel();//声明队列信息
        channel.queueDeclare(RabbitConstant.QUEUE_SINA,false,false,false,null);//指定队列与交换机以及routing key之间的关系,value就是要获取的信息//允许队列和交换机之间存在多个路由key,即存在多个路由key的消费者消费同一个队列里面的消息
        channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.jiangxi.ganzhou.20220403");
        channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.jiangxi.nanchang.20220403");
        channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.guangdong.guangzhou.20220403");
        channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.beijin.20220403");//设置确认签收一条信息之后再拿下一条信息
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA ,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("新浪天气收到气象信息:"+newString(body));
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

4.6 topic通配符模式

再如上面的天气,如果是天气预报发送了江西的全部市以及县的消息,江西的气象台要去获取中央获取的消息,不需要精确的获取到每个县的天气了,直接用江西的前缀获取到全部的信息,从而在江西的天气预报里面去分类以及播报,大大的减少服务器的压力以及大大增加系统的吞吐量以及速度。主要通过 * 和 # 来表示实现, * 可以匹配一个数据, # 可以匹配多个数据。
china.# :china开头都能匹配
#.street :street结尾的都能匹配
china.* :可以匹配china携带的一个数据,如china.jiangxi,china.guangdong
china.* .*:可以携带两个数据
以此类推…

4.6.1 天气服务端发送信息

publicclassWeatherBureau{publicstaticvoidmain(String[] args)throwsException{Map<String,String> map =newHashMap<String,String>();
        map.put("china.jiangxi.ganzhou.20220403","中国江西赣州20220403天气数据");
        map.put("china.jiangxi.nanchang.20220403","中国江西南昌20220403天气数据");
        map.put("china.jiangxi.jiujiang.20220404","中国江西九江20220404天气数据");
        map.put("china.beijin.20220403","中国北京20220403天气数据");

        map.put("china.guangdong.shenzhen.20220404","中国广东深圳20220404天气数据");
        map.put("china.guangdong.guangzhou.20220403","中国广东广州20220403天气数据");
        map.put("china.guangdong.dongguan.20220404","中国广东东莞20220404天气数据");
        map.put("china.beijin.20220404","中国北京20220404天气数据");Connection connection =RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String,String>> itr = map.entrySet().iterator();while(itr.hasNext()){Map.Entry<String,String> me = itr.next();//第一个参数交换机名字   第二个参数作为 消息的routing key
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(),null, me.getValue().getBytes());}
        channel.close();
        connection.close();}}

4.6.2 百度客户端接收消息

publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitUtils.getConnection();finalChannel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false,false,false,null);//queueBind用于将队列与交换机绑定//*最多只能匹配一个词 , #匹配一个或者多个词//参数1:队列名 参数2:交互机名  参数三:路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.*.*.20220403");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.*.20220403");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU ,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("百度天气收到气象信息:"+newString(body));
                channel.basicAck(envelope.getDeliveryTag(),false);}});}

4.6.3 新浪客户端接收消息

publicclassSina{publicstaticvoidmain(String[] args)throwsIOException{//获取TCP长连接Connection connection =RabbitUtils.getConnection();//获取虚拟连接finalChannel channel = connection.createChannel();//声明队列信息
        channel.queueDeclare(RabbitConstant.QUEUE_SINA,false,false,false,null);//指定队列与交换机以及routing key之间的关系
        channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"china.#");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA ,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("新浪天气收到气象信息:"+newString(body));
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

初步的代码描述就基本完成了

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/zhenghuishengq/article/details/124938580
版权归原作者 huisheng_qaq 所有, 如有侵权,请联系我们删除。

“RabbitMq概述与工作模式(1)”的评论:

还没有评论