Java Client操作RabbitMQ
文章目录
1.pom依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
2.连接工具类
/**
* rabbitmq连接工具类
* @author moshangshang
*/@Slf4jpublicclassRabbitMQUtil{privatestaticfinalStringHOST_ADDRESS="192.168.1.102";privatestaticfinalIntegerPORT=5672;privatestaticfinalStringVIRTUAL_HOST="my_vhost";privatestaticfinalStringUSER_NAME="root";privatestaticfinalStringPASSWORD="root";publicstaticConnectiongetConnection()throwsException{com.rabbitmq.client.ConnectionFactory factory=newcom.rabbitmq.client.ConnectionFactory();
factory.setHost(HOST_ADDRESS);
factory.setPort(PORT);
factory.setVirtualHost(VIRTUAL_HOST);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);return factory.newConnection();}publicstaticvoidmain(String[] args){Connection connection =null;try{
connection =getConnection();}catch(Exception e){
log.error("get rabbitmq connection exception....",e);}finally{try{if(connection!=null){
connection.close();}}catch(IOException e){
log.error("close rabbitmq connection exception....",e);}}}}
3.简单模式
生产者投递消费到队列进行消费
消息发送
publicclassSend{privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();try(Channel channel = connection.createChannel()){//声明队列/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message ="Hello World!";
channel.basicPublish("",QUEUE_NAME,null, message.getBytes());System.out.println(" [x] Sent '"+ message +"'");}}}
消息接收
因为希望在消费者异步监听消息到达时,当前程序能够继续执行,而不是退出。
因为提供了一个
DeliverCallback
回调,该回调将缓冲消息,直到准备使用它们。
publicclassRecv{privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println(" [x] Received '"+ message +"'");};
channel.basicConsume(QUEUE_NAME,true, deliverCallback, consumerTag ->{});}}
4.工作队列模式(work)
生产者直接投递消息到队列,存在多个消费者情况
- 创建一个工作队列,用于在多个工作人员之间分配耗时的任务。
- 工作队列(又名:任务队列)背后的主要思想是
避免立即执行资源密集型任务
,并必须等待其完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当你运行多个worker时,任务将在它们之间共享。 - 这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口内无法处理复杂的任务。
- 默认情况下,消费者会进行轮询调度
- RabbitMQ支持消息确认。消费者发送回一个确认,告诉RabbitMQ已经收到、处理了一条特定的消息,RabbitMQ可以自由删除它。
- 如果一个消费者在没有发送ack的情况下死亡(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ将理解消息未完全处理,并将其重新排队。如果同时有其他消费者在线,它将迅速将其重新传递给另一个消费者。这样,即使worker偶尔挂掉,也可以确保没有信息丢失。
- 消费者交付确认时强制执行超时(默认为30分钟)。这有助于检测一直没有确认的消费者。
- 默认情况下,手动消息确认已打开。在前面的示例中,我们通过autoAck=true标志明确地关闭了它们。一旦我们完成了一项任务,是时候将此标志设置为false并从worker发送适当的确认了。
公平调度
由于默认轮询调度,有些任务执行时间长,有些短,所以会导致部分worker压力大
使用预取计数=1设置的
basicQos
方法。这条消息告诉RabbitMQ一次不要给一个worker发送多条消息。在处理并确认前一条消息之前,不要向worker发送新消息。相反,它会将其发送给下一个不忙的worker。
int prefetchCount =1;
channel.basicQos(prefetchCount);
示例
publicclassWorkProvider{privatestaticfinalStringTASK_QUEUE_NAME="work_queue";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();try(Channel channel = connection.createChannel()){
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);for(int i =0; i <8; i++){String message =String.valueOf(i);
channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));System.out.println(" 消息发送 :'"+ i +"'");}}}}
publicclassWorkerConsumer1{privatestaticfinalStringTASK_QUEUE_NAME="work_queue";publicstaticvoidmain(String[] argv)throwsException{finalConnection connection =RabbitMQUtil.getConnection();finalChannel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);System.out.println(" 消息监听中。。。。。。");//控制ack流速,表示每次进行ack确认前只会处理一条消息//channel.basicQos(1);DeliverCallback deliverCallback =(consumerTag, delivery)->{//获取消息String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println("worker1 消息消费:'"+ message +"'");try{doWork(message);}finally{System.out.println(" 执行结束。。");//消息确认,根据消息序号(false只确认当前一个消息收到,true确认所有比当前序号小的消息(成功消费,消息从队列中删除 ))
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}};//设置自动应答
channel.basicConsume(TASK_QUEUE_NAME,false, deliverCallback, consumerTag ->{});}}
消费者1的方法处理
privatestaticvoiddoWork(String task){try{Thread.sleep(1000);}catch(InterruptedException ignored){Thread.currentThread().interrupt();}}
消费者2的方法处理
privatestaticvoiddoWork(String task){return;}
启动两个worker消费者,执行结果如下(轮询):
若设置公平调度
channel.basicQos(1);
测试结果:
5.发布/订阅模式(fanout)
- 不同于工作队列,同一消息在所有消费者共享,但只能有一个消费者消费,而发布订阅则会将同一消息发送给多个消费者,则将消息广播给所有订阅者
- 在之前的模式中,都是直接将消息发送给队列,然后从队列消费,事实上之前使用了一个默认的交换机,即
“”
空字符串的 - RabbitMQ消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,很多时候,生产者甚至根本不知道消息是否会被传递到任何队列。
- 相反,生产者只能向
exchange
发送消息。exchange
是一件非常简单的事情。它一方面接收来自生产者的消息,另一方面将它们推送到队列。exchange
必须确切地知道如何处理它收到的消息。
将消息生产投递到exchange,由交换机去投递消息到队列
交换机
有几种
exchange
类型可供选择:
direct
、
topic
、
headers
和
fanout
。我们将专注于最后一个
fanout
。
//创建交换机名称为logs
channel.exchangeDeclare("logs","fanout");/**
* exchange:交换机的名称
* type:交换机的类型
* durable 队列是否持久化
* autoDelete:是否自动删除,(当该交换机上绑定的最后一个队列解除绑定后,该交换机自动删除)
* internal:是否是内置的,true表示内置交换器。(则无法直接发消息给内置交换机,只能通过其他交换机路由到该交换机)
* argument:其他一些参数
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout",false,false,false,null);
第一个参数是
exchange
的名称。空字符串表示默认或未命名的交换:消息将被路由到
routingKey
指定名称的队列(如果存在)。
channel.basicPublish("logs","",null, message.getBytes());
绑定
我们已经创建了一个
fanout交换机
和一个队列。现在我们需要告诉
exchange
向我们的队列发送消息。交换和队列之间的关系称为绑定。
交换机会向绑定的队列通过路由key将消息路由到指定的队列中,
fanout分发不需要路由key
//其中,第一个参数为绑定的队列,第二个参数为绑定的交换机,第三个参数为路由key
channel.queueBind(queueName,"logs","");
#列出所有得绑定
rabbitmqctl list_bindings
示例代码
publicclassFanoutProvider{//声明交换机publicstaticfinalStringEXCHANGE_NAME="fanoutTest";//声明队列publicstaticfinalStringQUEUE_NAME1="queue_name1";publicstaticfinalStringQUEUE_NAME2="queue_name2";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();try(Channel channel = connection.createChannel()){//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout",false,false,false,null);//声明队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//进行队列绑定
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");String message ="fanout模式消息推送。。。。。";//消息推送//参数说明:交换机,路由key/队列,消息属性,消息体
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));System.out.println(" 消息发送 :'"+message +"'");}}}
publicclassFanoutConsumer1{publicstaticfinalStringEXCHANGE_NAME="fanoutTest";publicstaticfinalStringQUEUE_NAME1="queue_name1";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();//创建信道Channel channel = connection.createChannel();//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//绑定队列
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println(" fanout 消费者1:'"+ message +"'");};
channel.basicConsume(QUEUE_NAME1,true, deliverCallback, consumerTag ->{});}}
publicclassFanoutConsumer2{publicstaticfinalStringEXCHANGE_NAME="fanoutTest";publicstaticfinalStringQUEUE_NAME2="queue_name2";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();//创建信道Channel channel = connection.createChannel();//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//绑定队列
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println(" fanout 消费者2:'"+ message +"'");};
channel.basicConsume(QUEUE_NAME2,true, deliverCallback, consumerTag ->{});}}
6.路由模式(direct)
由交换机通过
路由key
和
绑定key
进行消息推送,也可以将同一个路由key绑定到多个队列或所有队列,此时相当于fanout
如果推送消息的
路由key不存在,则该消息会丢弃
publicclassDirectProvider{publicstaticfinalStringEXCHANGE_NAME="direct-exchange";publicstaticfinalStringQUEUE_NAME1="direct-queue";publicstaticfinalStringROUTING_KEY="change:direct";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();try(Channel channel = connection.createChannel()){//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,false,null);//声明队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);String message ="direct模式消息推送。。。。。";//参数说明:交换机,路由key/队列,消息属性,消息体
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));System.out.println(" 消息发送 :'"+message +"'");}}}
publicclassDirectConsumer{publicstaticfinalStringEXCHANGE_NAME="direct-exchange";publicstaticfinalStringQUEUE_NAME1="direct-queue";publicstaticfinalStringBINDING_KEY="change:direct";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();//创建信道Channel channel = connection.createChannel();//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");//绑定队列
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,BINDING_KEY);DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println(" direct 消费者1:'"+ message +"'");};
channel.basicConsume(QUEUE_NAME1,true, deliverCallback, consumerTag ->{});}}
7.Topic匹配模式
- 发送到主题交换的消息不能有任意的路由key,
它必须是一个由点分隔的单词列表
。单词可以是任何东西,但通常它们指定了与消息相关的一些特征。一些有效的路由key示例:stock.usd.nyse、nyse.vmw、quick.orange.rabbit。路由密钥中可以有任意多的单词,最多255个字节。 - 绑定key也必须采用相同的形式。topic交换机背后的逻辑类似于direct交换机,使用特定路由key发送的消息将被传递到所有使用绑定key绑定的所有队列。但是,绑定密钥有两个重要的特殊情况:
*(星号)只能代替一个单词
。#(hash)可以替代零个或多个单词
。
publicclassTopicProvider{publicstaticfinalStringEXCHANGE_NAME="topic-exchange";publicstaticfinalStringQUEUE_NAME1="topic-queue";publicstaticfinalStringROUTING_KEY="com.orange.test";publicstaticfinalStringROUTING_KEY2="com.orange.test.aaa";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();try(Channel channel = connection.createChannel()){//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC,false,false,false,null);//声明队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);String message1 ="topic test模式消息推送。。。。。";String message2 ="topic test.aaa模式消息推送。。。。。";//参数说明:交换机,路由key/队列,消息属性,消息体
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, message1.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY2,MessageProperties.PERSISTENT_TEXT_PLAIN, message2.getBytes(StandardCharsets.UTF_8));System.out.println(" 消息发送 :'"+message1 +"'");System.out.println(" 消息发送 :'"+message2 +"'");}}}
publicclassTopicConsumer{publicstaticfinalStringEXCHANGE_NAME="topic-exchange";publicstaticfinalStringQUEUE_NAME1="topic-queue";publicstaticfinalStringBINDING_KEY="*.orange.#";publicstaticvoidmain(String[] argv)throwsException{Connection connection =RabbitMQUtil.getConnection();//创建信道Channel channel = connection.createChannel();//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);//绑定队列
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,BINDING_KEY);DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println(" topic 消费者1:'"+ message +"'");};
channel.basicConsume(QUEUE_NAME1,true, deliverCallback, consumerTag ->{});}}
版权归原作者 陌殇殇 所有, 如有侵权,请联系我们删除。