0


RabbitMQ 六种模式(有手copy就行)理解用法 先使用在理解

RabbitMq 有六种模式(我觉得就是从第一个模式开始不断升级)

1:Hello-Wold HelloWorld模式

2: Work Queues 工作模式

3: Publish/Subscribe 发布订阅模式

4: Routing 路由模式

5: Topics 通配符模式

6: RPC RPC模式

让我为大家依次讲解

原理:

Productor---->channel---->Exchange----->Queue---->Channel---->Consumer

(这里是全部大部分都是默认的需要自定义)

就是生产者给通过通道发送给交换机 然后交换机进行筛选让然后传送到通道里面,在经过通道传送给消费者

打个比方吧:这个流程就像是一个快递流水线

Productor就是生产快递的商家

channel就是他是在商家打包成快递的通道

Exchange就是商家进行区分哪个是你的快递

Queue:就是在运输种这个通道

Channel:第二个channel就是到快递员送到你手里的通道

Consumer:就是购买这个快递的消费者

重点:重要的说三遍!!!!

根据上面的原理

(根据需求)必须要建立声明通道、交换机、队列

首先是Hello World模式

63970c032c8a4b55b0ca32aa9a0404dd.png

这个就是一对一的模式

打个比喻1:就是(呼吁必有回声)如果你不想接他就给你留的直到你接为止(感觉好像有点倒贴)

这个Hello World模式 他是生产者和消费者的关系 中间有一个Queue(队列)

发布者和生产者代码

connectionFactory.setHost("127.0.0.1");//默认是本机
connectionFactory.setPort(5672);//端口号必须是5675
connectionFactory.setUsername("guest");//默认用户名和密码
connectionFactory.setPassword("guest");//默认用户名和密码
connectionFactory.setVirtualHost("/");//默认服务器连接时使用的虚拟主机路径
package com.example.hyzn.demos.TestRabbitMQ.productor;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQpProductor {
    private static final String QUEUE = "Hello World";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // 声明一个队列 如果不存在就创建
            channel.queueDeclare(QUEUE, true, false, false, null);

            String message = "HElloWOrld";

            // 将消息发送到队列中
            channel.basicPublish("", QUEUE, null, message.getBytes());

            System.out.println("Message sent: " + message);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.example.hyzn.demos.TestRabbitMQ.consumer;

import com.rabbitmq.client.*;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQConsumer {
    private static final String QUEUE = "Hello World";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // 消费者判断队列是否存在
            channel.queueDeclare(QUEUE, true, false, false, null);

            // 定义消费者
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println("已经收到了消息: " + message);
                }
            };

            channel.basicConsume(QUEUE, true, defaultConsumer);

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
        }
    }
}

aaa70543d31546b3a6b17d9498a529f4.png

工作队列就是一对多的关系

是按生产的数量平均分发给消费者

按轮询的方式分发

打个比喻就像是:有十块糖 分别发送给俩个小孩 一个给A 一个给B

而不是直接一个人五个 (可以理解为见证公平)

这个处理代码就是按照Hello World 的模式 多开几个消费者 只开一个消费者运行就可以了

10ce7ab70412405c93f0bef00d7cdd29.png

这个就用到交换机了

1.一个生产者将消息发给交换机

2.与交换机绑定的有多个队列,每个消费者监听自己的队列

3.生产者将消息发给交换机 ,由交换机将消息转发给绑定的每个队列,每个队列都将接到消息

4.如果没有消息发给交换机,那么这条消息就会丢失

生产者代码:

package com.example.hyzn.demos.TestRabbitMQ.productor;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQpProductor_Public {
    private static final String QUEUE_INFOM_sms = "发短信";
    private static final String QUEUE_INFOM_email = "发邮箱";
    private static final String EXCHANFGE_FANOUT_INFORM = "chanel交换机";

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // 声明一个队列 如果不存在就创建
            channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);
            channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);

            channel.exchangeDeclare(EXCHANFGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);

            channel.queueBind(QUEUE_INFOM_sms, EXCHANFGE_FANOUT_INFORM, "");
            channel.queueBind(QUEUE_INFOM_email, EXCHANFGE_FANOUT_INFORM, "");
            // 将消息发送到队列中

            for (int i = 0; i < 5; i++) {
                String message = "send inform message to user";
                channel.basicPublish(EXCHANFGE_FANOUT_INFORM, "", null, message.getBytes());
                System.out.println("Message sent: " + message);
            }

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者代码(绑定不同的队列):

package com.example.hyzn.demos.TestRabbitMQ.consumer;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

public class TestMQConsumer_Public_email {
    private static final String QUEUE_INFOM_email = "发邮箱";
    private static final String EXCHANFGE_FANOUT_INFORM = "chanel交换机";
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // 消费者判断队列是否存在
            channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);

            channel.exchangeDeclare(EXCHANFGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);

            channel.queueBind(QUEUE_INFOM_email,EXCHANFGE_FANOUT_INFORM,"");

            // 定义消费者
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println("已经收到了消息: " + message);
                }
            };

            channel.basicConsume(QUEUE_INFOM_email, true, defaultConsumer);

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
        }
    }
}
package com.example.hyzn.demos.TestRabbitMQ.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQConsumer_Public_sms {
    private static final String QUEUE_INFOM_sms = "发短信";
    private static final String EXCHANFGE_FANOUT_INFORM = "chanel交换机";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // 消费者判断队列是否存在
            channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);

            channel.exchangeDeclare(EXCHANFGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);

            channel.queueBind(QUEUE_INFOM_sms,EXCHANFGE_FANOUT_INFORM,"");

            // 定义消费者
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println("已经收到了消息: " + message);
                }
            };

            channel.basicConsume(QUEUE_INFOM_sms, true, defaultConsumer);

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
        }
    }
}

5a7e6123b3504bee9fd6675540e3dae7.png

1.每个消费者监听自己的队列,设置RoutingKey 可以设置多个

根据RoutingKey来判断把消息发送给哪个

生产者代码:

package com.example.hyzn.demos.TestRabbitMQ.productor;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQpProductor_Routing {
    private static final String QUEUE_INFOM_sms = "sms";
    private static final String QUEUE_INFOM_email = "email";
    private static final String EXCHANGE_INFORM_Routing = "inform_exchange_new_Routing"; // Updated name for clarity
    private static final String ROUTING_KEY_INFOM_sms = "inform_sms_new";
    private static final String ROUTING_KEY_INFOM_email = "inform_email_new";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // Declare queues
            channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);
            channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);

            // Declare a direct exchange
            channel.exchangeDeclare(EXCHANGE_INFORM_Routing, BuiltinExchangeType.DIRECT);

            // Bind queues to the exchange with routing keys
            channel.queueBind(QUEUE_INFOM_sms, EXCHANGE_INFORM_Routing, ROUTING_KEY_INFOM_sms);
            channel.queueBind(QUEUE_INFOM_email, EXCHANGE_INFORM_Routing, ROUTING_KEY_INFOM_email);

            // Publish messages
            for (int i = 0; i < 5; i++) {
                String message = "send inform message sms to user";
                channel.basicPublish(EXCHANGE_INFORM_Routing, ROUTING_KEY_INFOM_sms, null, message.getBytes());
                System.out.println("Message sent: " + message);
            }
            for (int i = 0; i < 5; i++) {
                String message = "send inform message email to user";
                channel.basicPublish(EXCHANGE_INFORM_Routing, ROUTING_KEY_INFOM_email, null, message.getBytes());
                System.out.println("Message sent: " + message);
            }

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者代码(2个):

package com.example.hyzn.demos.TestRabbitMQ.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQConsumer_Routing_email {
    private static final String QUEUE_INFOM_email = "email";
    private static final String EXCHANGE_INFORM_Routing = "inform_exchange_new_Routing"; // Updated name for clarity
    private static final String ROUTING_KEY_INFOM_email = "inform_email_new";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // 消费者判断队列是否存在
            channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);

            channel.exchangeDeclare(EXCHANGE_INFORM_Routing,BuiltinExchangeType.DIRECT);

            channel.queueBind(QUEUE_INFOM_email,EXCHANGE_INFORM_Routing,ROUTING_KEY_INFOM_email);

            // 定义消费者
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println("已经收到了消息: " + message);
                }
            };

            channel.basicConsume(QUEUE_INFOM_email, true, defaultConsumer);

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
        }
    }
}
package com.example.hyzn.demos.TestRabbitMQ.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQConsumer_Routing_sms {
    private static final String QUEUE_INFOM_sms = "sms";
    private static final String EXCHANGE_INFORM_Routing = "inform_exchange_new_Routing"; // Updated name for clarity
    private static final String ROUTING_KEY_INFOM_sms = "inform_sms_new";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // 消费者判断队列是否存在
            channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);

            channel.exchangeDeclare(EXCHANGE_INFORM_Routing,BuiltinExchangeType.DIRECT);

            channel.queueBind(QUEUE_INFOM_sms,EXCHANGE_INFORM_Routing,ROUTING_KEY_INFOM_sms);

            // 定义消费者
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println("已经收到了消息: " + message);
                }
            };

            channel.basicConsume(QUEUE_INFOM_sms, true, defaultConsumer);

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
        }
    }
}

dc64a8f5ab064ec3afd518ef7c6b528d.png

匹配RoutingKey模式

符号#:匹配一个或者多个词 比如hello.# 和他匹配的就是hello.world 、hello.nihao、hello.world.nihao等

符号*:只能匹配一个词 比如hello.*和他匹配的就是hello.world、hello.email

生产者代码:

package com.example.hyzn.demos.TestRabbitMQ.productor;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQpProductor_Topic {
    private static final String QUEUE_INFOM_sms = "sms";
    private static final String QUEUE_INFOM_email = "email";
    private static final String EXCHANGE_INFORM_Toptic= "inform_exchange_new_Routing"; // Updated name for clarity
    private static final String Toptic_KEY_INFOM_sms = "inform.email.#";
    private static final String Toptic_KEY_INFOM_email = "inform.#.sms.#";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // Declare queues
            channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);
            channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);

            // Declare a direct exchange
            channel.exchangeDeclare(EXCHANGE_INFORM_Toptic, BuiltinExchangeType.TOPIC);

            // Bind queues to the exchange with routing keys
            channel.queueBind(QUEUE_INFOM_sms, EXCHANGE_INFORM_Toptic, Toptic_KEY_INFOM_sms);
            channel.queueBind(QUEUE_INFOM_email, EXCHANGE_INFORM_Toptic, Toptic_KEY_INFOM_email);

            // Publish messages
            for (int i = 0; i < 5; i++) {
                String message = "send inform message sms to user";
                channel.basicPublish(EXCHANGE_INFORM_Toptic, Toptic_KEY_INFOM_sms, null, message.getBytes());
                System.out.println("Message sent: " + message);
            }
            for (int i = 0; i < 5; i++) {
                String message = "send inform message email to user";
                channel.basicPublish(EXCHANGE_INFORM_Toptic, Toptic_KEY_INFOM_email, null, message.getBytes());
                System.out.println("Message sent: " + message);
            }

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者代码:

package com.example.hyzn.demos.TestRabbitMQ.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQConsumer_Toptic_eamil {

    private static final String QUEUE_INFOM_email = "email";
    private static final String EXCHANGE_INFORM_Toptic = "inform_exchange_new_Routing"; // Updated name for clarity
    private static final String Toptic_KEY_INFOM_email = "inform.email.new";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // 消费者判断队列是否存在
            channel.queueDeclare(QUEUE_INFOM_email, true, false, false, null);

            channel.exchangeDeclare(EXCHANGE_INFORM_Toptic,BuiltinExchangeType.TOPIC);

            channel.queueBind(QUEUE_INFOM_email,EXCHANGE_INFORM_Toptic,Toptic_KEY_INFOM_email);

            // 定义消费者
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println("已经收到了消息: " + message);
                }
            };

            channel.basicConsume(QUEUE_INFOM_email, true, defaultConsumer);

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
        }
    }
}
package com.example.hyzn.demos.TestRabbitMQ.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestMQConsumer_Toptic_sms {

    private static final String QUEUE_INFOM_sms = "sms";
    private static final String EXCHANGE_INFORM_Toptic = "inform_exchange_new_Routing"; // Updated name for clarity
    private static final String Toptic_KEY_INFOM_email = "inform.sms.new";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            // 消费者判断队列是否存在
            channel.queueDeclare(QUEUE_INFOM_sms, true, false, false, null);

            channel.exchangeDeclare(EXCHANGE_INFORM_Toptic,BuiltinExchangeType.TOPIC);

            channel.queueBind(QUEUE_INFOM_sms,EXCHANGE_INFORM_Toptic,Toptic_KEY_INFOM_email);

            // 定义消费者
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println("已经收到了消息: " + message);
                }
            };

            channel.basicConsume(QUEUE_INFOM_sms, true, defaultConsumer);

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
        }
    }
}

c67b76ba160d4d35b073063f31f7fa9b.png

1.客户端即是生产者也是消费者,实现异步调用,基于Direct交换机实现

2.服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到返回的结果

3.服务端将RPC的方法的结果发送到RPC响应的队列

客户端代码

package com.example.hyzn.demos.TestRabbitMQ.productor;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] argv) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        try (Connection connection = factory.newConnection(); 
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);

            String response = call(channel, "Hello World");
            System.out.println("Response: " + response);
        }
    }

    private static String call(Channel channel, String message) throws IOException {
        final String corrId = UUID.randomUUID().toString();
        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        };
        channel.basicConsume(replyQueueName, true, deliverCallback, consumerTag -> { });

        try {
            return response.take();
        } catch (InterruptedException e) {
            return null;
        }
    }
}

服务端代码:

package com.example.hyzn.demos.TestRabbitMQ.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] argv) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        try (Connection connection = factory.newConnection(); 
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            System.out.println("Awaiting RPC requests");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String response = "";
                String message = new String(delivery.getBody(), "UTF-8");

                try {
                    response += "Processed: " + message; // Your processing logic here
                } catch (Exception e) {
                    System.err.println("Error: " + e.getMessage());
                } finally {
                    AMQP.BasicProperties props = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(delivery.getProperties().getCorrelationId())
                            .build();
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), props, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/2201_75521619/article/details/142106653
版权归原作者 程序员Shen_.li 所有, 如有侵权,请联系我们删除。

“RabbitMQ 六种模式(有手copy就行)理解用法 先使用在理解”的评论:

还没有评论