RabbitMq 有六种模式(我觉得就是从第一个模式开始不断升级)
1:Hello-Wold HelloWorld模式
2: Work Queues 工作模式
3: Publish/Subscribe 发布订阅模式
4: Routing 路由模式
5: Topics 通配符模式
6: RPC RPC模式
让我为大家依次讲解
原理:
Productor---->channel---->Exchange----->Queue---->Channel---->Consumer
(这里是全部大部分都是默认的需要自定义)
就是生产者给通过通道发送给交换机 然后交换机进行筛选让然后传送到通道里面,在经过通道传送给消费者
打个比方吧:这个流程就像是一个快递流水线
Productor就是生产快递的商家
channel就是他是在商家打包成快递的通道
Exchange就是商家进行区分哪个是你的快递
Queue:就是在运输种这个通道
Channel:第二个channel就是到快递员送到你手里的通道
Consumer:就是购买这个快递的消费者
重点:重要的说三遍!!!!
根据上面的原理
(根据需求)必须要建立声明通道、交换机、队列
首先是Hello World模式
这个就是一对一的模式
打个比喻1:就是(呼吁必有回声)如果你不想接他就给你留的直到你接为止(感觉好像有点倒贴)
这个Hello World模式 他是生产者和消费者的关系 中间有一个Queue(队列)
发布者和生产者代码
connectionFactory.setHost("127.0.0.1");//默认是本机
connectionFactory.setPort(5672);//端口号必须是5675
connectionFactory.setUsername("guest");//默认用户名和密码
connectionFactory.setPassword("guest");//默认用户名和密码
connectionFactory.setVirtualHost("/");//默认服务器连接时使用的虚拟主机路径
package com.example.hyzn.demos.TestRabbitMQ.productor;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQpProductor {
private static final String QUEUE = "Hello World";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 声明一个队列 如果不存在就创建
channel.queueDeclare(QUEUE, true, false, false, null);
String message = "HElloWOrld";
// 将消息发送到队列中
channel.basicPublish("", QUEUE, null, message.getBytes());
System.out.println("Message sent: " + message);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
try {
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.example.hyzn.demos.TestRabbitMQ.consumer;
import com.rabbitmq.client.*;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQConsumer {
private static final String QUEUE = "Hello World";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 消费者判断队列是否存在
channel.queueDeclare(QUEUE, true, false, false, null);
// 定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println("已经收到了消息: " + message);
}
};
channel.basicConsume(QUEUE, true, defaultConsumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
}
}
}
、
工作队列就是一对多的关系
是按生产的数量平均分发给消费者
按轮询的方式分发
打个比喻就像是:有十块糖 分别发送给俩个小孩 一个给A 一个给B
而不是直接一个人五个 (可以理解为见证公平)
这个处理代码就是按照Hello World 的模式 多开几个消费者 只开一个消费者运行就可以了
这个就用到交换机了
1.一个生产者将消息发给交换机
2.与交换机绑定的有多个队列,每个消费者监听自己的队列
3.生产者将消息发给交换机 ,由交换机将消息转发给绑定的每个队列,每个队列都将接到消息
4.如果没有消息发给交换机,那么这条消息就会丢失
生产者代码:
package com.example.hyzn.demos.TestRabbitMQ.productor;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQpProductor_Public {
private static final String QUEUE_INFOM_sms = "发短信";
private static final String QUEUE_INFOM_email = "发邮箱";
private static final String EXCHANFGE_FANOUT_INFORM = "chanel交换机";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 声明一个队列 如果不存在就创建
channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);
channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);
channel.exchangeDeclare(EXCHANFGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
channel.queueBind(QUEUE_INFOM_sms, EXCHANFGE_FANOUT_INFORM, "");
channel.queueBind(QUEUE_INFOM_email, EXCHANFGE_FANOUT_INFORM, "");
// 将消息发送到队列中
for (int i = 0; i < 5; i++) {
String message = "send inform message to user";
channel.basicPublish(EXCHANFGE_FANOUT_INFORM, "", null, message.getBytes());
System.out.println("Message sent: " + message);
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
try {
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
消费者代码(绑定不同的队列):
package com.example.hyzn.demos.TestRabbitMQ.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class TestMQConsumer_Public_email {
private static final String QUEUE_INFOM_email = "发邮箱";
private static final String EXCHANFGE_FANOUT_INFORM = "chanel交换机";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 消费者判断队列是否存在
channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);
channel.exchangeDeclare(EXCHANFGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
channel.queueBind(QUEUE_INFOM_email,EXCHANFGE_FANOUT_INFORM,"");
// 定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println("已经收到了消息: " + message);
}
};
channel.basicConsume(QUEUE_INFOM_email, true, defaultConsumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
}
}
}
package com.example.hyzn.demos.TestRabbitMQ.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQConsumer_Public_sms {
private static final String QUEUE_INFOM_sms = "发短信";
private static final String EXCHANFGE_FANOUT_INFORM = "chanel交换机";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 消费者判断队列是否存在
channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);
channel.exchangeDeclare(EXCHANFGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
channel.queueBind(QUEUE_INFOM_sms,EXCHANFGE_FANOUT_INFORM,"");
// 定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println("已经收到了消息: " + message);
}
};
channel.basicConsume(QUEUE_INFOM_sms, true, defaultConsumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
}
}
}
1.每个消费者监听自己的队列,设置RoutingKey 可以设置多个
根据RoutingKey来判断把消息发送给哪个
生产者代码:
package com.example.hyzn.demos.TestRabbitMQ.productor;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQpProductor_Routing {
private static final String QUEUE_INFOM_sms = "sms";
private static final String QUEUE_INFOM_email = "email";
private static final String EXCHANGE_INFORM_Routing = "inform_exchange_new_Routing"; // Updated name for clarity
private static final String ROUTING_KEY_INFOM_sms = "inform_sms_new";
private static final String ROUTING_KEY_INFOM_email = "inform_email_new";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// Declare queues
channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);
channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);
// Declare a direct exchange
channel.exchangeDeclare(EXCHANGE_INFORM_Routing, BuiltinExchangeType.DIRECT);
// Bind queues to the exchange with routing keys
channel.queueBind(QUEUE_INFOM_sms, EXCHANGE_INFORM_Routing, ROUTING_KEY_INFOM_sms);
channel.queueBind(QUEUE_INFOM_email, EXCHANGE_INFORM_Routing, ROUTING_KEY_INFOM_email);
// Publish messages
for (int i = 0; i < 5; i++) {
String message = "send inform message sms to user";
channel.basicPublish(EXCHANGE_INFORM_Routing, ROUTING_KEY_INFOM_sms, null, message.getBytes());
System.out.println("Message sent: " + message);
}
for (int i = 0; i < 5; i++) {
String message = "send inform message email to user";
channel.basicPublish(EXCHANGE_INFORM_Routing, ROUTING_KEY_INFOM_email, null, message.getBytes());
System.out.println("Message sent: " + message);
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
try {
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
消费者代码(2个):
package com.example.hyzn.demos.TestRabbitMQ.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQConsumer_Routing_email {
private static final String QUEUE_INFOM_email = "email";
private static final String EXCHANGE_INFORM_Routing = "inform_exchange_new_Routing"; // Updated name for clarity
private static final String ROUTING_KEY_INFOM_email = "inform_email_new";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 消费者判断队列是否存在
channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_INFORM_Routing,BuiltinExchangeType.DIRECT);
channel.queueBind(QUEUE_INFOM_email,EXCHANGE_INFORM_Routing,ROUTING_KEY_INFOM_email);
// 定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println("已经收到了消息: " + message);
}
};
channel.basicConsume(QUEUE_INFOM_email, true, defaultConsumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
}
}
}
package com.example.hyzn.demos.TestRabbitMQ.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQConsumer_Routing_sms {
private static final String QUEUE_INFOM_sms = "sms";
private static final String EXCHANGE_INFORM_Routing = "inform_exchange_new_Routing"; // Updated name for clarity
private static final String ROUTING_KEY_INFOM_sms = "inform_sms_new";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 消费者判断队列是否存在
channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_INFORM_Routing,BuiltinExchangeType.DIRECT);
channel.queueBind(QUEUE_INFOM_sms,EXCHANGE_INFORM_Routing,ROUTING_KEY_INFOM_sms);
// 定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println("已经收到了消息: " + message);
}
};
channel.basicConsume(QUEUE_INFOM_sms, true, defaultConsumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
}
}
}
匹配RoutingKey模式
符号#:匹配一个或者多个词 比如hello.# 和他匹配的就是hello.world 、hello.nihao、hello.world.nihao等
符号*:只能匹配一个词 比如hello.*和他匹配的就是hello.world、hello.email
生产者代码:
package com.example.hyzn.demos.TestRabbitMQ.productor;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQpProductor_Topic {
private static final String QUEUE_INFOM_sms = "sms";
private static final String QUEUE_INFOM_email = "email";
private static final String EXCHANGE_INFORM_Toptic= "inform_exchange_new_Routing"; // Updated name for clarity
private static final String Toptic_KEY_INFOM_sms = "inform.email.#";
private static final String Toptic_KEY_INFOM_email = "inform.#.sms.#";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// Declare queues
channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);
channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);
// Declare a direct exchange
channel.exchangeDeclare(EXCHANGE_INFORM_Toptic, BuiltinExchangeType.TOPIC);
// Bind queues to the exchange with routing keys
channel.queueBind(QUEUE_INFOM_sms, EXCHANGE_INFORM_Toptic, Toptic_KEY_INFOM_sms);
channel.queueBind(QUEUE_INFOM_email, EXCHANGE_INFORM_Toptic, Toptic_KEY_INFOM_email);
// Publish messages
for (int i = 0; i < 5; i++) {
String message = "send inform message sms to user";
channel.basicPublish(EXCHANGE_INFORM_Toptic, Toptic_KEY_INFOM_sms, null, message.getBytes());
System.out.println("Message sent: " + message);
}
for (int i = 0; i < 5; i++) {
String message = "send inform message email to user";
channel.basicPublish(EXCHANGE_INFORM_Toptic, Toptic_KEY_INFOM_email, null, message.getBytes());
System.out.println("Message sent: " + message);
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
try {
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
消费者代码:
package com.example.hyzn.demos.TestRabbitMQ.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQConsumer_Toptic_eamil {
private static final String QUEUE_INFOM_email = "email";
private static final String EXCHANGE_INFORM_Toptic = "inform_exchange_new_Routing"; // Updated name for clarity
private static final String Toptic_KEY_INFOM_email = "inform.email.new";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 消费者判断队列是否存在
channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_INFORM_Toptic,BuiltinExchangeType.TOPIC);
channel.queueBind(QUEUE_INFOM_email,EXCHANGE_INFORM_Toptic,Toptic_KEY_INFOM_email);
// 定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println("已经收到了消息: " + message);
}
};
channel.basicConsume(QUEUE_INFOM_email, true, defaultConsumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
}
}
}
package com.example.hyzn.demos.TestRabbitMQ.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestMQConsumer_Toptic_sms {
private static final String QUEUE_INFOM_sms = "sms";
private static final String EXCHANGE_INFORM_Toptic = "inform_exchange_new_Routing"; // Updated name for clarity
private static final String Toptic_KEY_INFOM_email = "inform.sms.new";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 消费者判断队列是否存在
channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_INFORM_Toptic,BuiltinExchangeType.TOPIC);
channel.queueBind(QUEUE_INFOM_sms,EXCHANGE_INFORM_Toptic,Toptic_KEY_INFOM_email);
// 定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println("已经收到了消息: " + message);
}
};
channel.basicConsume(QUEUE_INFOM_sms, true, defaultConsumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
}
}
}
1.客户端即是生产者也是消费者,实现异步调用,基于Direct交换机实现
2.服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到返回的结果
3.服务端将RPC的方法的结果发送到RPC响应的队列
客户端代码
package com.example.hyzn.demos.TestRabbitMQ.productor;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
String response = call(channel, "Hello World");
System.out.println("Response: " + response);
}
}
private static String call(Channel channel, String message) throws IOException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
};
channel.basicConsume(replyQueueName, true, deliverCallback, consumerTag -> { });
try {
return response.take();
} catch (InterruptedException e) {
return null;
}
}
}
服务端代码:
package com.example.hyzn.demos.TestRabbitMQ.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println("Awaiting RPC requests");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String response = "";
String message = new String(delivery.getBody(), "UTF-8");
try {
response += "Processed: " + message; // Your processing logic here
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
} finally {
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
channel.basicPublish("", delivery.getProperties().getReplyTo(), props, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(RPC_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
}
版权归原作者 程序员Shen_.li 所有, 如有侵权,请联系我们删除。