0


RabbitMQ的主题模式

主题模式

image-20230810180351000

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:# 匹配零个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

生产者代码
public class TopicProducer {
    public static void main(String[] args) throws Exception {
        //1.创建连接
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.64.140");
        cf.setUsername("guest");
        cf.setPassword("guest");
        
        Connection nc = cf.newConnection();
        Channel cc = nc.createChannel();
        
        //2.定义交换机
        cc.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        //3.发送数据
        while(true) {
            Scanner scanner = new Scanner(System.in);
            System.out.println("消息:");
            String s = scanner.nextLine();
            System.out.println("路由键:");
            String key=scanner.nextLine();
            cc.basicPublish("topic_logs", key, null, s.getBytes());
            System.out.println("=======================================");
        }
    }
}
消费者代码
public class TopicConsumer {
    public static void main(String[] args) throws Exception {
        //1.创建连接
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.64.140");
        cf.setUsername("guest");
        cf.setPassword("guest");
        
        Connection nc = cf.newConnection();
        Channel cc = nc.createChannel();
        //2.定义交换机、队列、绑定
        cc.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        String queue = cc.queueDeclare().getQueue();
        System.out.print("输入绑定键:");
        String s = new Scanner(System.in).nextLine();
        String[] a = s.split("\\s+");
        for (String key : a) {
            cc.queueBind(queue, "topic_logs", key);
        }
        //3.处理消息
        DeliverCallback deliverCallback = new DeliverCallback() {
            
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                byte[] a = message.getBody();
                String s = new String(a);
                String key = message.getEnvelope().getRoutingKey();
                System.out.println(s+"--"+key);
                System.out.println("========================================");
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        };
        cc.basicConsume(queue, true,deliverCallback,cancelCallback);
    }
}

RPC模式

img

如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC。

在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。

客户端

在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果

RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四个斐波那契数是: " + result);
回调队列

使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:

//定义回调队列,
//自动生成对列名,非持久,独占,自动删除
callbackQueueName = ch.queueDeclare().getQueue();

//用来设置回调队列的参数对象
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();
//发送调用消息
ch.basicPublish("", "rpc_queue", props, message.getBytes());
消息属性 Message Properties
AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:
deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。
contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。
replyTo:通常用于指定回调队列。
correlationId:将RPC响应与请求关联起来非常有用。
关联id(correlationId)

在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。

这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。

服务端代码
public class RPCServer {
    public static void main(String[] args) throws Exception {
        //1.接受客户端发送的调用信息(正整数n)
        //2.执行算法求第n个斐波那契数的结果
        //3.向客户端发送计算结果
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.64.140");
        cf.setPort(5672);
        cf.setUsername("guest");
        cf.setPassword("guest");
        Connection nc = cf.newConnection();
        Channel cc = nc.createChannel();
        
        cc.queueDeclare("rpc_queue", false, false, false, null);
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                //从message中取出消息:n,返回队列的队列名,关联id
                String s = new String(message.getBody());
                String replyTo = message.getProperties().getReplyTo();//返回队列的队列名
                String cid = message.getProperties().getCorrelationId();
                long fbnq = fbnq(Integer.parseInt(s));
                BasicProperties props = new BasicProperties.Builder().correlationId(cid).build();
                cc.basicPublish("", replyTo, props, (""+fbnq).getBytes());
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        };
        cc.basicConsume("rpc_queue", true,deliverCallback,cancelCallback);
    }
    
    //服务:接受一个整数值n,求第n个斐波那契数
    //1 1 2 3 5 8 13 21 34 55 89 144 ......
    //递归求斐波那契数,递归的效率是非常低的
    //递归效率低,可以用来模拟服务器端的耗时运算
    static long fbnq(int n) {
        if(n==1 || n==2) {
            return 1;
        }
        return fbnq(n-1)+fbnq(n-2);
    }
}
客户端代码
public class RPCClient {
    static BlockingQueue<Long> q=new ArrayBlockingQueue<Long>(10);
    public static void main(String[] args) throws Exception {
        System.out.print("输入求第几个斐波那契数:");
        int n = new Scanner(System.in).nextInt();
        long fbnq=fbnq(n);
        System.out.println(fbnq);
    }
    //异步调用服务器,从服务器获取结果
    private static long fbnq(int n) throws Exception {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("192.168.64.140");
        cf.setPort(5672);
        cf.setUsername("admin");
        cf.setPassword("admin");
        Connection nc = cf.newConnection();
        Channel cc = nc.createChannel();
        //定义发送调用消息的队列
        cc.queueDeclare("rpc_queue", false, false, false, null);
        cc.queuePurge("rpc_queue");
        //返回队列
        String replyTo = cc.queueDeclare().getQueue();
        //关联id
        String cid = UUID.randomUUID().toString();
         BasicProperties props = new BasicProperties.Builder()
                 .replyTo(replyTo)
                 .correlationId(cid)
                 .build();
        cc.basicPublish("", "rpc_queue", props, (""+n).getBytes());
        //模拟执行其他运算,不等待计算结果
        System.out.println("调用消息已发送");
        System.out.println("模拟执行其他运算,不立即等待计算结果");
        //获取计算结果
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                //处理数据之前,先比对关联id
                if(cid.equals(message.getProperties().getCorrelationId())) {
                    String s=new String(message.getBody());
                    long fbnq = Integer.parseInt(s);
                    q.offer(fbnq);
                    ch.getConnection().close();
                }
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        };
        cc.basicConsume(replyTo, true,deliverCallback,cancelCallback);
        return q.take();
    }
}

上一篇文章:https://blog.csdn.net/Z0412_J0103/article/details/143355002https://blog.csdn.net/Z0412_J0103/article/details/143355002**下一篇文章: **


本文转载自: https://blog.csdn.net/Z0412_J0103/article/details/143355068
版权归原作者 小星袁 所有, 如有侵权,请联系我们删除。

“RabbitMQ的主题模式”的评论:

还没有评论