0


RabbitMQ再回首--往事如梦

这文章你就读吧,越读越🥸,一读一个不吱声

可靠的🐰警官:rabbitMQ,功能全面,不丢数据,体量小,容易堆积

声明exchange

channel.exchangeDeclare(String exchange, String type, boolean durable, boolean

autoDelete,Map<String, Object> arguments) throws IOException;

durable=false 我勒个天才 会丢消息哒,broker重启之后 exchange就从这个世界上消失了,pro还怎么发消息 啊!

声明queue

channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean

autoDelete, Map<String, Object> arguments);

 exclusive排他队列:仅对首次申明它的连接可见,连接断开自动删除

      同一连接不同信道:可以访问同一连接创建的排他队列

      
  1. type=Quorum和Stream,默认持久化:durable=true,exclusive=false
Map<String,Object> params = new HashMap<>();
params.put("x-queue-type","stream");
params.put("x-max-length-bytes", 20_000_000_000L); //日志文件的最大字节数 maximum stream size:20 GB
params.put("x-stream-max-segment-size-bytes", 100_000_000); //每一个日志文件的最大大小 size of segment files: 100 MB
channel.queueDeclare(QUEUE_NAME, true, false, false, params);

queue持久化不代表消息持久化,消息要看消息的:basicPublish

单队列持久化 重启之后 消息会离你而去,单消息持久化 重启之后 消息早跑了 妥妥的

exchange与queue绑定关系

channel.queueBind(String queue, String exchange, String routingKey) throws

IOException; //Producer发送过来的消息将分到哪些Queue

发送到queue

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
//对应页面上的Properties部分,传入一些预定的参数值。
builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());//持久化
builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());//持久化
//builder.headers(headers);对应页面上的Headers部分。传入自定义的参数值
builder.build()
AMQP.BasicProperties prop = builder.build();

RabbitMQ持久化机制-CSDN博客

public BasicProperties(
            String contentType,//消息类型如:text/plain
            String contentEncoding,//编码
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent不持久化 2:persistent持久化
            Integer priority,//优先级
            String correlationId,
            String replyTo,//反馈队列
            String expiration,//expiration到期时间
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)

consumer消费消息

  • 被动消费:服务器推送,消费者设置缓冲区缓存,效率是高了但是缓冲区可能溢出 一口胖成🪣(这届奥运会的谐音梗 你懂不懂)

     channel.basicConsume(String queue, boolean autoAck, Consumer callback);#不断推送
    
         每个channel有独立线程,一个channel一个消费比较推荐,做channel最重要的专一
    
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback
cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws
IOException;
    Callback,可实现handle方法
  • 主动消费:拉取msg,需要时拉取 增加消息延迟/降低系统吞吐量;单条 注意啊 单条

     GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);
    
      autoAck=true,消息被consumer消费成功后,无法再消费,一次性的 懂?
    
      autoAck=false,需手动调channel.basicAck(),不凋则重复消费 资源多就是这么豪横
    

    Rabbit MQ 消息消费模式_rabbitmq消费模式-CSDN博客

消费场景

他又来了 真的是 大水冲了龙王庙 不是一家人不进一家门

一,pro 送一个 msg 到que,不需要exchange,con按que 消费

二,pro发送msg到que,多con消费同一个队列queue

  • con 的autoAck=false,调channel.basicAck通知服务器消息完成,否则重复消费
  • msg持久,queue不能被多次定义,一旦接受了durable的设定就守身如玉,及时改了 骨子里还是durable
  • channel.basicQos(prefetchCount);consumer同时处理msg个数,发送msg前检查发送但未收到basicAck有几个,如超过prefetchCount则换一个consumer;这样始乱终弃当然有问题了,all节点都超了,怎么办?宝宝就问你怎么办?及时发现 添加节点 再者你就换 下一个更乖

三,发布订阅 type=fanout

pro消息发exchange,将消息路由到绑定的队列,队列对应一个消费者

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");type=fanout,向绑定的队列发送msg

// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明交换器
String exchangeName = "publishSubscribeExchange";
channel.exchangeDeclare(exchangeName, "fanout");

// 创建随机队列并绑定到交换器
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "");

// 发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, "", null, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);

// 接收消息
boolean autoAck = true;
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received message: " + message);
    }
};
channel.basicConsume(queueName, autoAck, consumer);
四,路由 type=direct 


     


五,topic 


    pro通过通配符匹配路由键,交换机将消息 路由到 它绑定的队列 匹配的路由键的队列中,这就话是不是很绕,让我来用中文翻译一下:交换机 把消息发送到 配置了同样路由键的队列中 

RabbitMQ的五种常见消费模型_rabbit mq消费模式-CSDN博客


本文转载自: https://blog.csdn.net/ma15732625261/article/details/141036021
版权归原作者 星辰_mya 所有, 如有侵权,请联系我们删除。

“RabbitMQ再回首--往事如梦”的评论:

还没有评论