0


RabbitMQ的七种工作模式代码介绍

作为MQ初学者的我,写下这篇博客用来加深对MQ代码的认识和了解。后续的MQ都是使用Spring集成进来的,与此文无关。

1.七种工作模式概述
  1. 简单模式(Simple)
  2. 工作队列模式(Work Queue)
  3. 发布订阅模式(Publish/Subscribe)
  4. 路由模式(Routing)
  5. 通配符模式(Topics)
  6. RPC模式(RPC)
  7. 发布确认模式(Publish Confirms)

上述工作模式,其中1-5使用的比较多,6-7较少,代码难度也比较大。

2.简单与工作队列模式

(1)简单模式

简单模式是rabbitmq的入门模式,也是最简单的

1)工作模式图

这种模式下,只有一个生产者和一个消费者,中间使用一个阻塞队列来连接.

特点:一个生产者,一个消费者,消息只能被消费一次。也称为点对点模式

2)代码写法

生产者:

建立连接--开启信道--指定队列(有不创无则创)--发送信息--关闭连接

/**
 * 生产者代码编写
 */
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接Connection
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.138.121.41");//主机ip
        factory.setPort(5672);//端口号
        factory.setUsername("study");//用户名
        factory.setPassword("study");//用户密码
        factory.setVirtualHost("test01");//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道Channel
        Channel channel = connection.createChannel();
        //3.指定队列
        /*
            queueDeclare(String queue, boolean durable, boolean exclusive,
            boolean autoDelete, Map<String, Object> arguments)
            1.queue: 队列名称
            2.durable: 是否持久化, 当mq重启之后, 消息还在
            3.exclusive:是否独占, 只能有⼀个消费者监听队列,当Connection关闭时, 是否删除队列
            4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉
            5.arguments: ⼀些参数
         */
        channel.queueDeclare("queue01", true, false, false, null);
        //4.发送信息
        channel.basicPublish("", "queue01", null, "Hello World".getBytes());
        //5.资源释放
        channel.close();
        connection.close();
    }

}

消费者:

建立连接--开启信道--指定队列(有不创无则创)--消费信息--关闭连接

//消费者
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
       //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.138.121.41");//主机ip
        factory.setPort(5672);//端口号
        factory.setUsername("study");//用户名
        factory.setPassword("study");//用户密码
        factory.setVirtualHost("test01");//虚拟主机
        Connection connection = factory.newConnection();
       //2.开启信道
        Channel channel = connection.createChannel();
       //3.指定队列
        channel.queueDeclare("queue01", true, false, false, null);

        //前面三步和第五步跟生产者模型一致

       //4.消费信息

        /*
        回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法
        1. consumerTag: 标识
        2. envelope: 获取⼀些信息, 交换机, 路由key
        3. properties: 配置信息
        4. body: 数据
         */
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息后执行的逻辑,打印:"+new String(body));
            }
        };
        /**
         * String basicConsume(String var1, boolean var2, Consumer var3)
         * 参数解释:
         * var1: 队列名称
         * autoAck: 是否自动确认(信息)
         * callback: 接收到消息后,执行的逻辑(这里是打印消息) -- 回调方法
         */
        channel.basicConsume("queue01", true, consumer);//消费消息
        //5.释放连接
        channel.close();
        connection.close();
    }
}

生产者和消费者的代码基本相同,区别就在消费消息和生产消息。

生产消息到指定队列中:

//4.发送信息
channel.basicPublish("", "queue01", null, "Hello World".getBytes());

第一个参数:指定交换机,不写则默认

第二个参数:指定消息存放的队列

第三个参数:是否携带额外的属性(比如优先级等)

第四个参数:消息体

(2)工作队列模式

工作队列模式在简单模式的基础上多加一个消费者,两个消费者共同消费一份信息(先到先得原则)

1)工作模式图

特点:

每个消费者会获得不同的消息,并且不会重复。

适用场景:集群环境中做异步处理

2)代码写法

生产者:和简单模式的一样

/**
 * 生产者代码编写
 */
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接Connection
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.138.121.41");//主机ip
        factory.setPort(5672);//端口号
        factory.setUsername("study");//用户名
        factory.setPassword("study");//用户密码
        factory.setVirtualHost("test01");//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道Channel
        Channel channel = connection.createChannel();
        //3.指定队列
        channel.queueDeclare("queue01", true, false, false, null);
        //4.发送信息
        for(int i=0;i<10;i++) {
            String msg = "简单队列模式: "+i;
            channel.basicPublish("", "work.queue", null, msg.getBytes());
        }
        //5.资源释放
        channel.close();
        connection.close();
    }

}

消费者1:和简单模式一样

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.138.121.41");//主机ip
        factory.setPort(5672);//端口号
        factory.setUsername("study");//用户名
        factory.setPassword("study");//用户密码
        factory.setVirtualHost("test01");//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.指定队列(使用默认的交换机)
        channel.queueDeclare("work.queue",true,false,false,null);
        //4.消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:"+new String(body));
            }
        };
        channel.basicConsume("work.queue", true, consumer);//消费消息
        //5.关闭连接
    }
}

另一个消费者也一样,这里就不多余写了

结果:

由此可见,这两个消费者消费同一个队列的消息,彼此获得消息各不相同。

3.发布订阅模式

发布订阅模式,一个生产者生产的消息,只要订阅了队列,就可以拿到消息。每个人都有份切不重复。

(1)工作模式图

(2)代码写法

这是一个存放常量的类:

public class Constants {
    //建立连接需要的常量
    public static final String HOST = "8.138.121.41";//主机
    public static final int PORT = 5672;//端口号
    public static final String USER_NAME = "study";//用户名
    public static final String PASSWORD = "study";//用户密码
    public static final String VIRTUAL_HOST = "test01";//虚拟主机

    //工作队列模式
    public static final String WORK_QUEUE = "work.queue";

    //发布订阅模式
    public static final String FANOUT_EXCHANGE = "fanout.exchange";//交换机
    public static final String FANOUT_QUEUE1 = "fanout.queue1";//队列
    public static final String FANOUT_QUEUE2 = "fanout.queue2";

    //路由模式
    public static final String DIRECT_EXCHANGE = "direct.exchange";//交换机
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";

    //通配符模式
    public static final String TOPIC_EXCHANGE = "topic.exchange";//交换机
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";

    //rpc模式
    public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";

    //发布确认模式
    public static final String PUBLISH_CONFIRMS_QUEUE1 = "publish.confirms.queue1";
    public static final String PUBLISH_CONFIRMS_QUEUE2 = "publish.confirms.queue2";
    public static final String PUBLISH_CONFIRMS_QUEUE3 = "publish.confirms.queue3";

}

1)生产者

public class Produce {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接Connection
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道Channel
        Channel channel = connection.createChannel();
        //3.指定交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
        //4.指定队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
        //5.交换机和队列进行绑定(两个队列绑定同一个交换机)
        channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
        channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
        //6.发送信息
        for(int i=0;i<10;i++) {
            String msg = "发布订阅模式: "+i;
            channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
        }
        //5.资源释放
        channel.close();
        connection.close();
    }

}

这里和简单模式相比,多了指定交换机、交换机和队列进行绑定这两步。

指定交换机:

//3.指定交换机
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
  1. 第一个参数:指定的交换机
  2. 第二个参数:交换机的类型(有三种,这里是FANOT模式,也就是发布订阅模式)
  3. 第三个参数:数据是否可持久化

交换机和队列绑定:

//5.交换机和队列进行绑定(两个队列绑定同一个交换机)
channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
  • 第一个参数:绑定的队列是谁
  • 第二个参数:要绑定的交换机是谁
  • 第三个参数:routingkey,也就是路由规则。发布订阅模式为null

2)消费者

这里的消费者和简单模式一样。并且生产者已经绑定好了交换机和队列的关系,所以这里无须指定,直接使用队列即可。

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.指定队列(使用默认的交换机)
        channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
        //4.消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:"+new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//消费消息
        //5.关闭连接
        channel.close();
        connection.close();
    }
}

另一个消费者的代码也一样。

结果:

队列中的消息,两个消费者共享,也就是每个消费者都会有一份。

4.路由模式

路由模式就是发布订阅模式的变种,指定路由的方式。

(1)工作模式图

(2)代码写法

1)生产者

public class Produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.指定交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true);
        //4.指定队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
        //5.绑定路由关系
        channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");
        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");
        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");
        channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");
        //6.发送消息
        String msg = "hello direct, my routingkey is a....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());

        String msg_b = "hello direct, my routingkey is b....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());

        String msg_c = "hello direct, my routingkey is c....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());
        System.out.println("消息发送成功");

        //7. 释放资源
        channel.close();
        connection.close();

    }
}

根据指定的路由规则,就可以把特定的消息发送到对应的队列中。

2)消费者

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
        //4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:"+new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);
    }
}

3)结果

5.通配符模式

这个也是发布订阅模式的变种

(2)代码写法

1)生产者

public class Produce {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true);
        //4.声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
        //5.绑定交换机和队列
        channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*");
        channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b");
        channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#");
        //6.发送消息
        String msg = "hello topic, my routingkey is ae.a.f....";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes());  //转发到Q1

        String msg_b = "hello topic, my routingkey is ef.a.b....";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2

        String msg_c = "hello topic, my routingkey is c.ef.d....";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2
        System.out.println("消息发送成功");

        //7. 释放资源
        channel.close();
        connection.close();

    }

}

2)消费者

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
        //4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:"+new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE1,true,consumer);
    }
}

另一个消费者负责消费队列2中的消息

3)结果

6.RPC模式

这个模式分为一个请求队列和响应队列,客户端把请求发送到请求队列中,服务器会根据请求队列中的请求,把对应的响应(带有编号)放入响应队列中,客户端取出即可。

**(1)工作模式图 **

(2)代码写法

客户端:

public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道声明对应
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        //3.发送请求(放入请求队列)
        //3.1.构造消息体
        String msg = "我是一条rpc请求消息";
        //3.2.消息唯一标识符(序号)
        String correlationID = UUID.randomUUID().toString();
        //3.3.设置消息的属性
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                                    .builder()
                                    .correlationId(correlationID)//消息标识符
                                    .replyTo(Constants.RPC_RESPONSE_QUEUE)//响应放回的队列
                                    .build();
        channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());
        //4.接收响应(从响应队列获取)
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("接收到回调消息"+msg);
                //放入阻塞队列中
                if(correlationID.equals(properties.getCorrelationId())) {
                    response.offer(msg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);
        //5.从阻塞队列中取出响应
        String result = response.take();
        System.out.println("接收到的响应:[]"+result);
    }
}

服务端:

public class RpcServer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        Connection connection = factory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.接受请求并返回响应
        channel.basicQos(1);//每次接受一个请求
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body,"UTF-8");
                System.out.println("接收到请求:"+ request);
                String response = "针对request:"+ request +", 响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());//返回响应
                channel.basicAck(envelope.getDeliveryTag(), false);//手动确认
            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);
    }
}

7.发布确认模式

背景:

所以发布确认模式是针对生产者的,确定消息已经发送出去,如果没收到,会让发送方重新发送。

和前面的模式大同小异,这里需要将信道设置为确认模式,也需要指定队列。

发布确认模式有三种策略:单独确认、批量确认、异步确认。

(1)单独确认

 private static final Integer MAX_COUNT = 100;
    /**
     * 建立连接
     *
     * @return 连接
     */
    static Connection createConnection() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        return factory.newConnection();
    }

/**
     * 单独确认
     */
private static void publishingMessageIndividually() throws Exception {
            try(Connection connection = createConnection()) {
                //1.开启信道
                Channel channel = connection.createChannel();
                //2.设置信道类型(确认模式)
                channel.confirmSelect();
                //3.声明队列
                channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE1,true,false,false,null);
                //4.发送消息
                long start  = System.currentTimeMillis();
                for(int i=0;i<MAX_COUNT;i++) {
                    String msg = "Message #" + i;
                    channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE1,null,msg.getBytes());//发送消息
                    channel.waitForConfirmsOrDie(5000);//等待确认,超过5000ms报错
                }
                long end = System.currentTimeMillis();
                System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MAX_COUNT, end-start);
            }
    }

(2)批量确认

 private static final Integer MAX_COUNT = 100;
    /**
     * 建立连接
     *
     * @return 连接
     */
    static Connection createConnection() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        return factory.newConnection();
    }

/**
     * 批量确认
     */
    private static void publishingMessageInBatches() throws Exception {
        try(Connection connection = createConnection()) {
            //1.开启信道
            Channel channel = connection.createChannel();
            //2.设置信道类型(确认模式)
            channel.confirmSelect();
            //3.声明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE2,true,false,false,null);
            //4.发送消息
            long start  = System.currentTimeMillis();
            int size = 100;//确定消息的个数
            int outstandingMessageCount = 0;//计数
            for(int i=0;i<MAX_COUNT;i++) {
                String msg = "Message #" + i;
                channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE2,null,msg.getBytes());//发送消息
                outstandingMessageCount++;
                if(outstandingMessageCount == size) {
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if(outstandingMessageCount > 0) {
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MAX_COUNT, end-start);
        }
    }

(3)异步确认

 private static final Integer MAX_COUNT = 100;
    /**
     * 建立连接
     *
     * @return 连接
     */
    static Connection createConnection() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);//主机ip
        factory.setPort(Constants.PORT);//端口号
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//用户密码
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
        return factory.newConnection();
    }
 /**
     * 异步确认
     */
    private static void handlingPublisherConfirmsAsynchronously() throws Exception {
        //可以一遍发送消息一边进行确认
        try(Connection connection = createConnection()) {
            //1.开启信道
            Channel channel = connection.createChannel();
            //2.设置确认模式
            channel.confirmSelect();
            //3.指定队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE3,true,false,false,null);
            //4.监听队列
            long start  = System.currentTimeMillis();

            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());//用来存储未被确认的消息序号
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    //收到ack
                    if(b) {
                        //批量确认
                        confirmSeqNo.headSet(l+1).clear();
                    }else {
                        //单独确认
                        confirmSeqNo.remove(l);//移除id为l的消息
                    }
                }

                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    //未收到ack
                    if(b) {
                        confirmSeqNo.headSet(l+1).clear();
                    }else {
                        confirmSeqNo.remove(l);
                    }
                    //
                }
            });
            //5.发送消息
            for(int i=0;i<MAX_COUNT;i++) {
                String msg = "Message #" + i;
                long seqNo = channel.getNextPublishSeqNo();//获取序号
                channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE3,null,msg.getBytes());
                confirmSeqNo.add(seqNo);//存储序号
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MAX_COUNT, end-start);
        }
    }

本文转载自: https://blog.csdn.net/2301_77053417/article/details/143039795
版权归原作者 代码小娥 所有, 如有侵权,请联系我们删除。

“RabbitMQ的七种工作模式代码介绍”的评论:

还没有评论