主题模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配零个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
生产者代码
public class TopicProducer {
public static void main(String[] args) throws Exception {
//1.创建连接
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.64.140");
cf.setUsername("guest");
cf.setPassword("guest");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//2.定义交换机
cc.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
//3.发送数据
while(true) {
Scanner scanner = new Scanner(System.in);
System.out.println("消息:");
String s = scanner.nextLine();
System.out.println("路由键:");
String key=scanner.nextLine();
cc.basicPublish("topic_logs", key, null, s.getBytes());
System.out.println("=======================================");
}
}
}
消费者代码
public class TopicConsumer {
public static void main(String[] args) throws Exception {
//1.创建连接
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.64.140");
cf.setUsername("guest");
cf.setPassword("guest");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//2.定义交换机、队列、绑定
cc.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
String queue = cc.queueDeclare().getQueue();
System.out.print("输入绑定键:");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+");
for (String key : a) {
cc.queueBind(queue, "topic_logs", key);
}
//3.处理消息
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] a = message.getBody();
String s = new String(a);
String key = message.getEnvelope().getRoutingKey();
System.out.println(s+"--"+key);
System.out.println("========================================");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
cc.basicConsume(queue, true,deliverCallback,cancelCallback);
}
}
RPC模式
如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC。
在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。
客户端
在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果
RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四个斐波那契数是: " + result);
回调队列
使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:
//定义回调队列,
//自动生成对列名,非持久,独占,自动删除
callbackQueueName = ch.queueDeclare().getQueue();
//用来设置回调队列的参数对象
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
//发送调用消息
ch.basicPublish("", "rpc_queue", props, message.getBytes());
消息属性 Message Properties
AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:
deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。
contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。
replyTo:通常用于指定回调队列。
correlationId:将RPC响应与请求关联起来非常有用。
关联id(correlationId)
在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。
这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。
服务端代码
public class RPCServer {
public static void main(String[] args) throws Exception {
//1.接受客户端发送的调用信息(正整数n)
//2.执行算法求第n个斐波那契数的结果
//3.向客户端发送计算结果
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.64.140");
cf.setPort(5672);
cf.setUsername("guest");
cf.setPassword("guest");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
cc.queueDeclare("rpc_queue", false, false, false, null);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//从message中取出消息:n,返回队列的队列名,关联id
String s = new String(message.getBody());
String replyTo = message.getProperties().getReplyTo();//返回队列的队列名
String cid = message.getProperties().getCorrelationId();
long fbnq = fbnq(Integer.parseInt(s));
BasicProperties props = new BasicProperties.Builder().correlationId(cid).build();
cc.basicPublish("", replyTo, props, (""+fbnq).getBytes());
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
cc.basicConsume("rpc_queue", true,deliverCallback,cancelCallback);
}
//服务:接受一个整数值n,求第n个斐波那契数
//1 1 2 3 5 8 13 21 34 55 89 144 ......
//递归求斐波那契数,递归的效率是非常低的
//递归效率低,可以用来模拟服务器端的耗时运算
static long fbnq(int n) {
if(n==1 || n==2) {
return 1;
}
return fbnq(n-1)+fbnq(n-2);
}
}
客户端代码
public class RPCClient {
static BlockingQueue<Long> q=new ArrayBlockingQueue<Long>(10);
public static void main(String[] args) throws Exception {
System.out.print("输入求第几个斐波那契数:");
int n = new Scanner(System.in).nextInt();
long fbnq=fbnq(n);
System.out.println(fbnq);
}
//异步调用服务器,从服务器获取结果
private static long fbnq(int n) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.64.140");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//定义发送调用消息的队列
cc.queueDeclare("rpc_queue", false, false, false, null);
cc.queuePurge("rpc_queue");
//返回队列
String replyTo = cc.queueDeclare().getQueue();
//关联id
String cid = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder()
.replyTo(replyTo)
.correlationId(cid)
.build();
cc.basicPublish("", "rpc_queue", props, (""+n).getBytes());
//模拟执行其他运算,不等待计算结果
System.out.println("调用消息已发送");
System.out.println("模拟执行其他运算,不立即等待计算结果");
//获取计算结果
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//处理数据之前,先比对关联id
if(cid.equals(message.getProperties().getCorrelationId())) {
String s=new String(message.getBody());
long fbnq = Integer.parseInt(s);
q.offer(fbnq);
ch.getConnection().close();
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
cc.basicConsume(replyTo, true,deliverCallback,cancelCallback);
return q.take();
}
}
版权归原作者 小星袁 所有, 如有侵权,请联系我们删除。