0


【MQ 系列】SprigBoot + RabbitMq 消息发送基本使用姿势

本篇内容主要为消息发送,包括以下几点

  • RabbitTemplate 发送消息的基本使用姿势
  • 自定义消息基本属性
  • 自定义消息转换器AbstractMessageConverter
  • 发送 Object 类型消息失败的 case

I. 基本使用姿势

1. 配置

我们借助

SpringBoot 2.2.1.RELEASE

+

rabbitmq 3.7.5

来完整项目搭建与测试

项目 pom.xml 如下

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

application.yml

内容如下

spring:
  rabbitmq:
    virtual-host:/
    username:admin
    password:admin
    port:5672
    host:127.0.0.1

2. 配置类

通过前面 rabbitmq 的知识点学习,我们可以知道发送端,将消息发送给 exchange,然后根据不同的策略分发给对应的 queue

本篇博文主要讨论的是消息发送,所以定义一个 topic 模式的 exchange,并绑定一个的 queue;(对发送端而言,不通过的 exchange 类型,对使用姿势影响不大)

publicclass MqConstants {

    publicstaticfinal String exchange = "topic.e";

    publicstaticfinal String routing = "r";

    publicfinalstatic String queue = "topic.a";

}

@Configuration
publicclass MqConfig {
    @Bean
    public TopicExchange topicExchange() {
        returnnew TopicExchange(MqConstants.exchange);
    }

    @Bean
    public Queue queue() {
        // 创建一个持久化的队列
        returnnew Queue(MqConstants.queue, true);
    }

    @Bean
    public Binding binding(TopicExchange topicExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        returnnew RabbitTemplate(connectionFactory);
    }
}

3. 消息发送

消息发送,主要借助的是

RabbitTemplate#convertAndSend

方法来实现,通常情况下,我们直接使用即可

@Service
publicclass BasicPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    private String publish2mq1(String ans) {
        String msg = "Durable msg = " + ans;
        System.out.println("publish: " + msg);
        rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg;
    }
}

上面的核心点就一行

rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
  • 表示将 msg 发送给指定的 exchange,并设置消息的路由键

通过上面的方式,发送的消息默认是持久化的,当持久化的消息,分发到持久化的队列时,会有消息的落盘操作;在某些场景下,我们对消息的要求并没有那么严格,反而更在意 mq 的性能,丢失一些数据也可以接收;这个时候我们可能需要定制一下发送的消息属性

下面提供两种姿势,推荐第二种

/**
 * 推送一个非持久化的消息,这个消息推送到持久化的队列时,mq重启,这个消息会丢失;上面的持久化消息不会丢失
 *
 * @param ans
 * @return
 */
private String publish2mq2(String ans) {
    MessageProperties properties = new MessageProperties();
    properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
    Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);

    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);

    System.out.println("publish: " + message);
    return message.toString();
}

private String publish2mq3(String ans) {
    String msg = "Define msg = " + ans;
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setHeader("ta", "测试");
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        }
    });

    return msg;
}

注意

  • 在实际的项目开发中,推荐使用MessagePostProcessor来定制消息属性,其次不推荐每次发送消息时都创建一个MessagePostProcessor对象,请定义一个通用的对象

4. 非序列化对象发送异常 case

通过查看

rabbitTemplate#convertAndSend

的接口定义,我们知道发送的消息可以是 Object 类型,那么是不是任何对象,都可以推送给 mq 呢?

下面是一个测试 case

private String publish2mq4(String ans) {
    NonSerDO nonSerDO = new NonSerDO(18, ans);
    System.out.println("publish: " + nonSerDO);
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
    return nonSerDO.toString();
}

@Data
publicstaticclass NonSerDO {
    private Integer age;
    private String name;

    public NonSerDO(int age, String name) {
        this.age = age;
        this.name = name;
    }
}

当我们调用上面的

publish2mq4

方法时,会抛出一个参数类型异常

为什么会出现这个问题呢?从堆栈分析,我们知道 RabbitTemplate 默认是利用

SimpleMessageConverter

来实现封装 Message 逻辑的,核心代码为

// 下面代码来自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    byte[] bytes = null;
    if (object instanceofbyte[]) {
        bytes = (byte[]) object;
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
    }
    elseif (object instanceof String) {
        try {
            bytes = ((String) object).getBytes(this.defaultCharset);
        }
        catch (UnsupportedEncodingException e) {
            thrownew MessageConversionException(
                    "failed to convert to Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        messageProperties.setContentEncoding(this.defaultCharset);
    }
    elseif (object instanceof Serializable) {
        try {
            bytes = SerializationUtils.serialize(object);
        }
        catch (IllegalArgumentException e) {
            thrownew MessageConversionException(
                    "failed to convert to serialized Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
    }
    if (bytes != null) {
        messageProperties.setContentLength(bytes.length);
        returnnew Message(bytes, messageProperties);
    }
    thrownew IllegalArgumentException(getClass().getSimpleName()
            + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}

上面逻辑很明确的指出了,只接受 byte 数组,string 字符串,可序列化对象(这里使用的是 jdk 的序列化方式来实现对象和 byte 数组之间的互转)

自然而然的,我们会想有没有其他的

MessageConverter

来友好的支持任何类型的对象

5. 自定义 MessageConverter

接下来我们希望通过自定义一个 json 序列化方式的 MessageConverter 来解决上面的问题

一个比较简单的实现(利用 FastJson 来实现序列化/反序列化)

publicstaticclass SelfConverter extends AbstractMessageConverter {
    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        messageProperties.setContentType("application/json");
        returnnew Message(JSON.toJSONBytes(object), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return JSON.parse(message.getBody());
    }
}

重新定义一个

rabbitTemplate

,并设置它的消息转换器为自定义的

SelfConverter
@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new SelfConverter());
    return rabbitTemplate;
}

然后再次测试一下

@Service
publicclass JsonPublisher {
    @Autowired
    private RabbitTemplate jsonRabbitTemplate;

    private String publish1(String ans) {
        Map<String, Object> msg = new HashMap<>(8);
        msg.put("msg", ans);
        msg.put("type", "json");
        msg.put("version", 123);
        System.out.println("publish: " + msg);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg.toString();
    }

    private String publish2(String ans) {
        BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans);
        System.out.println("publish: " + nonSerDO);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
        return nonSerDO.toString();
    }
}

mq 内接收到的推送消息如下

6. Jackson2JsonMessageConverter

上面虽然实现了 Json 格式的消息转换,但是比较简陋;而且这么基础通用的功能,按照 Spring 全家桶的一贯作风,肯定是有现成可用的,没错,这就是

Jackson2JsonMessageConverter

所以我们的使用姿势也可以如下

//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}

// 测试代码
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
    Map<String, Object> msg = new HashMap<>(8);
    msg.put("msg", ans);
    msg.put("type", "jackson");
    msg.put("version", 456);
    System.out.println("publish: " + msg);
    jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
    return msg.toString();
}

下面是通过 Jackson 序列化消息后的内容,与我们自定义的有一些不同,多了

headers

content_encoding

7. 小结

本篇博文主要的知识点如下

  • 通过RabbitTemplate#convertAndSend来实现消息分发
  • 通过MessagePostProcessor来自定义消息的属性(请注意默认投递的消息是持久化的)
  • 默认的消息封装类为SimpleMessageConverter,只支持分发 byte 数组,字符串和可序列化的对象;不满足上面三个条件的方法调用会抛异常
  • 我们可以通过实现MessageConverter接口,来定义自己的消息封装类,解决上面的问题

在 RabbitMq 的知识点博文中,明确提到了,为了确保消息被 brocker 正确接收,提供了消息确认机制和事务机制两种 case,那么如果需要使用这两种方式,消息生产者需要怎么做呢?

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/cxyxysam/article/details/135460450
版权归原作者 程序员高级码农. 所有, 如有侵权,请联系我们删除。

“【MQ 系列】SprigBoot + RabbitMq 消息发送基本使用姿势”的评论:

还没有评论