0


RabbitMQ基础组件封装—整体结构(总篇)

一、父项目 rabbit-parent

使用 idea 创建 maven 项目,命名为 rabbit-parent,作为 最外围的父项目,在其下创建四个 Module :rabbit-api、rabbit-core-producer、rabbit-common、rabbit-task,然后将父项目 rabbit-parent 的 src 目录删除,只保留 pom.xml,用于添加 依赖项,如下:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>8</java.version>
        <fasterxml.uuid.version>3.1.4</fasterxml.uuid.version>
        <org.codehaus.jackson.version>1.9.13</org.codehaus.jackson.version>
        <druid.version>1.0.24</druid.version>
        <elastic-job.version>2.1.4</elastic-job.version>
        <guava.version>20.0</guava.version>
        <commons-lang3.version>3.3.1</commons-lang3.version>
        <commons-io.version>2.4</commons-io.version>
        <commons-collections.version>3.2.2</commons-collections.version>
        <curator.version>2.11.0</curator.version>
        <fastjson.version>1.1.26</fastjson.version>        
    </properties>  
      <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>      
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>        
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>${commons-io.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>        
        <!--对json格式的支持 -->
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>${org.codehaus.jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>   
        <dependency>
            <groupId>com.fasterxml.uuid</groupId>
            <artifactId>java-uuid-generator</artifactId>
            <version>${fasterxml.uuid.version}</version>
        </dependency>  
      </dependencies>

二、rabbit-api 实现接口定义

1. rabbit-api 所要做的事

首先应该把该项目的整体功能进行抽象,形成一系列接口。这些接口都定义在 rabbit-api 中。

该项目的接口可归类为发送迅速、延迟、可靠三种类型的消息。需要对这三类消息做一个封装。

2. 自定义 Message(放于 rabbit-api 模块下)

既然我们是对消息进行封装,很显然,我们要自己做一个message的实体类,主要把我们定义的消息属性做一个规划,其中一些比较关键的属性如下:

  • messageId 唯一消息id
  • topic 代表exchange主机名
  • routingKey 路由键
  • Map<String,Object> attribates 消息附加属性
  • delayMills 消息延迟时间
  • messageType 消息类型
public class Message implements Serializable {

    private static final long serialVersionUID = 841277940410721237L;

    /*     消息的唯一ID    */
    private String messageId;
    
    /*    消息的主题        */
    private String topic;
    
    /*    消息的路由规则    */
    private String routingKey = "";
    
    /*    消息的附加属性    */
    private Map<String, Object> attributes = new HashMap<String, Object>();
    
    /*    延迟消息的参数配置    */
    private int delayMills;
    
    /*    消息类型:默认为confirm消息类型    */
    private String messageType = MessageType.CONFIRM;

    public Message() {
    }
    
    public Message(String messageId, String topic, String routingKey, Map<String, Object> attributes, int delayMills) {
        this.messageId = messageId;
        this.topic = topic;
        this.routingKey = routingKey;
        this.attributes = attributes;
        this.delayMills = delayMills;
    }
    
    public Message(String messageId, String topic, String routingKey, Map<String, Object> attributes, int delayMills,
            String messageType) {
        this.messageId = messageId;
        this.topic = topic;
        this.routingKey = routingKey;
        this.attributes = attributes;
        this.delayMills = delayMills;
        this.messageType = messageType;
    }
    
}

其中 messageType 应该包括的类型为:

  • 迅速消息 :不需要保障消息的可靠性,也不需要做confirm确认
    
  • 确认消息 :不需要保障消息的可靠性,但是做消息confirm确认
    
  • 可靠消息 :保障消息100%可靠投递,不允许有任何消息丢失。(保障数据所发的消息是原子性的)
    
public final class MessageType {

    /**
     *     迅速消息:不需要保障消息的可靠性, 也不需要做confirm确认
     */
    public static final String RAPID = "0";
    
    /**
     *     确认消息:不需要保障消息的可靠性,但是会做消息的confirm确认
     */
    public static final String CONFIRM = "1";
    
    /**
     *     可靠性消息: 一定要保障消息的100%可靠性投递,不允许有任何消息的丢失
     *     PS: 保障数据库和所发的消息是原子性的(最终一致的)
     */
    public static final String RELIANT = "2";
    
}

对于 Message 对象的创建,这里使用建造者模式,对应的类如下:

/**
 *     $MessageBuilder 建造者模式
 *
 */
public class MessageBuilder {

    /*
     * Message 中的属性全部拷贝一份
     */
    private String messageId;
    private String topic;
    private String routingKey = "";
    private Map<String, Object> attributes = new HashMap<String, Object>();
    private int delayMills;
    private String messageType = MessageType.CONFIRM;
    
    private MessageBuilder() {
    }

    public static MessageBuilder create() {
        return new MessageBuilder();
    }

    public MessageBuilder withMessageId(String messageId) {
        this.messageId = messageId;
        return this;
    }
    
    public MessageBuilder withTopic(String topic) {
        this.topic = topic;
        return this;
    }
    
    public MessageBuilder withRoutingKey(String routingKey) {
        this.routingKey = routingKey;
        return this;
    }

    public MessageBuilder withAttributes(Map<String, Object> attributes) {
        this.attributes = attributes;
        return this;
    }
    
    public MessageBuilder withAttribute(String key, Object value) {
        this.attributes.put(key, value);
        return this;
    }
    
    public MessageBuilder withDelayMills(int delayMills) {
        this.delayMills = delayMills;
        return this;
    }
    
    public MessageBuilder withMessageType(String messageType) {
        this.messageType = messageType;
        return this;
    }

    public Message build() {
        
        // 1. check messageId 
        if(messageId == null) {
            messageId = UUID.randomUUID().toString();
        }
        // 2. topic is null ,这时应该终止运行
        if(topic == null) {
            throw new MessageRunTimeException("this topic is null");
        }
        Message message = new Message(messageId, topic, routingKey, attributes, delayMills, messageType);
        return message;
    }
    
}

3. 在Spring提供的一场的基础上扩展自己的异常(放于 rabbit-api 模块下)

(1)扩展一般异常:MessageException.java

public class MessageException extends Exception {

    private static final long serialVersionUID = 6347951066190728758L;

    public MessageException() {
        super();
    }
    
    public MessageException(String message) {
        super(message);
    }
    
    public MessageException(String message, Throwable cause) {
        super(message, cause);
    }
    
    public MessageException(Throwable cause) {
        super(cause);
    }
    
}

(2)扩展运行时异常类:MessageRunTimeException.java

public class MessageRunTimeException extends RuntimeException {

    private static final long serialVersionUID = 8651828913888663267L;

    public MessageRunTimeException() {
        super();
    }
    
    public MessageRunTimeException(String message) {
        super(message);
    }
    
    public MessageRunTimeException(String message, Throwable cause) {
        super(message, cause);
    }
    
    public MessageRunTimeException(Throwable cause) {
        super(cause);
    }
}

4. 生产者发送消息的接口api抽象出来(放于 rabbit-api 模块下):

public interface MessageProducer {

    /**
     *     $send消息的发送 附带SendCallback回调执行响应的业务逻辑处理
     * @param message
     * @param sendCallback
     * @throws MessageRunTimeException
     */
    void send(Message message, SendCallback sendCallback) throws MessageRunTimeException;
    
    /**
     *     
     * @param message消息的发送
     * @throws MessageRunTimeException
     */
    void send(Message message) throws MessageRunTimeException;
    
    /**
     *     $send 消息的批量发送
     * @param messages
     * @throws MessageRunTimeException
     */
    void send(List<Message> messages) throws MessageRunTimeException;
    
}

其中的回调函数 SendCallback.java 定义如下:

/**
 *     $SendCallback 回调函数处理
 *
 */
public interface SendCallback {

    void onSuccess();
    
    void onFailure();
    
}

5. 消费者监听的接口抽象出来(放于 rabbit-api 模块下)

接口如下:(不过这个接口将来也不打算实现,只会实现生产者发送消息的接口)

/**
 *     $MessageListener 消费者监听消息
 *
 */
public interface MessageListener {

    void onMessage(Message message);
    
}

三、对 rabbit-api 所定义的接口进行实现

1. 整体架构实现

整体架构的实现需要用到其他几个子模块:rabbit-core-producer、rabbit-common、rabbit-task

(1)添加依赖项

首先是在 rabbit-common 中添加依赖项:

        <!-- 这个rabbit-api就是上面编写的module:rabbit-api-->
        <dependency>
            <groupId>com.didiok.base.rabbit</groupId>
            <artifactId>rabbit-api</artifactId>
            <version>0.0.1-SNAPSHOT</version>              
          </dependency>
        <!-- mq的依赖 -->
        <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
          </dependency>

然后再将 rabbit-common 项目作为依赖项添加到 rabbit-core-producer中的 pom.xml里:

        <!-- rabbit-common作为依赖项添加进 rabbit-core-producer中-->
        <dependency>
            <groupId>com.didiok.base.rabbit</groupId>
            <artifactId>rabbit-common</artifactId> 
            <version>0.0.1-SNAPSHOT</version>            
          </dependency>

(2)实现自动装配(放于 rabbit-core-producer 模块下)

首先定义一个类,用于 spring 的自动装配。
RabbitProducerAutoConfiguration.java

/**
 *     $RabbitProducerAutoConfiguration 自动装配 
 *
 */
@Configuration
public class RabbitProducerAutoConfiguration {
​​​​​​​
}

配置一下,将该类加入到自动装配的扫描中,需要在 src/main/resources 下创建 META-INF 文件夹,并在该文件夹中创建 spring.factories文件:

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.bfxy.rabbit.producer.autoconfigure.RabbitProducerAutoConfiguration

这样如果将该项目作为maven依赖引入到某个Spring 应用,在 该应用启动的时候就会自动加载该类。

(3)实现 rabbit-api 中的接口类 MessageProducer.java(发送消息的接口,放于 rabbit-core-producer 模块下)

ProducerClient.java

/**
 *     $ProducerClient 发送消息的实际实现类
 *
 */
@Component
public class ProducerClient implements MessageProducer {

    @Override
    public void send(Message message) throws MessageRunTimeException {
    
    }

    @Override
    public void send(List<Message> messages) throws MessageRunTimeException {
        // TODO Auto-generated method stub
    }
    
    @Override
    public void send(Message message, SendCallback sendCallback) throws MessageRunTimeException {
        // TODO Auto-generated method stub
        
    }
}

ProducerClient.java在包 com.didiok.rabbit.producer 下面,所以需要在 RabbitProducerAutoConfiguration.java 中添加包的扫描 @ComponentScan ,添加后代码如下:

/**
 *     $RabbitProducerAutoConfiguration 自动装配 
 *
 */
@Configuration
@ComponentScan({"com.didiok.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {
}

实现 ProducerClient.java 中的第一个方法:

@Component
public class ProducerClient implements MessageProducer {

    @Autowired
    private RabbitBroker rabbitBroker;
    
    @Override
    public void send(Message message) throws MessageRunTimeException {
        Preconditions.checkNotNull(message.getTopic());
        String messageType = message.getMessageType();
        switch (messageType) {
            case MessageType.RAPID:
                rabbitBroker.rapidSend(message);
                break;
            case MessageType.CONFIRM:
                rabbitBroker.confirmSend(message);
                break;
            case MessageType.RELIANT:
                rabbitBroker.reliantSend(message);
                break;
        default:
            break;
        }
    }

    @Override
    public void send(List<Message> messages) throws MessageRunTimeException {

    }
    
    @Override
    public void send(Message message, SendCallback sendCallback) throws MessageRunTimeException {
        // TODO Auto-generated method stub
        
    }
}

发送消息的具体逻辑抽象出来封装在 RabbitBroker.java 中:

/**
 *     $RabbitBroker 具体发送不同种类型消息的接口
 *
 */
public interface RabbitBroker {
    
    void rapidSend(Message message);
    
    void confirmSend(Message message);
    
    void reliantSend(Message message);
    
    void sendMessages();
    
}

四、三种类型消息发送的具体逻辑实现

1. 迅速消息类型的 的逻辑实现(放于 rabbit-core-producer 模块下)

在RabbitBroker.java的实现类 RabbitBrokerImpl.java中,实现 迅速消息类型 的逻辑:

/**
 *     $RabbitBrokerImpl 真正的发送不同类型的消息实现类
 *
 */
@Slf4j
@Component
public class RabbitBrokerImpl implements RabbitBroker {

    @Autowired
    private RabbitTemplateContainer rabbitTemplateContainer;

    /**
     *     $rapidSend迅速发消息
     */
    @Override
    public void rapidSend(Message message) {
        message.setMessageType(MessageType.RAPID);
        sendKernel(message);
    }
    
    /**
     *     $sendKernel 发送消息的核心方法 使用异步线程池进行发送消息
     * @param message
     */
    private void sendKernel(Message message) {
        AsyncBaseQueue.submit((Runnable) () -> {
            CorrelationData correlationData = 
                    new CorrelationData(String.format("%s#%s#%s",
                            message.getMessageId(),
                            System.currentTimeMillis(),
                            message.getMessageType()));
            String topic = message.getTopic();
            String routingKey = message.getRoutingKey();
            RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
            rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
            log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());            
        });
    }

    @Override
    public void confirmSend(Message message) {
        
    }

    @Override
    public void reliantSend(Message message) {
        
    }
    
    @Override
    public void sendMessages() {
        
    }
}

这里又使用到了两个类:异步线程队列 AsyncBaseQueue.java 和 RabbitTemplate的池化处理RabbitTemplateContainer.java 。

为了提升发送消息的吞吐量,使用异步线程池发送消息(虽然 rabbitTemplate.convertAndSend()本身就是异步方法,但是这里的异步化是为了让整个匿名表达式都放到异步处理中去,由于这里的匿名表达式代码简单,所以性能提升不大,这里的异步线程池只是作为一个规范建议)。

AsyncBaseQueue.java :

@Slf4j
public class AsyncBaseQueue {

    private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();
    
    private static final int QUEUE_SIZE = 10000;
    
    private static ExecutorService senderAsync =
            new ThreadPoolExecutor(THREAD_SIZE,
                    THREAD_SIZE,
                    60L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(QUEUE_SIZE),
                    // 创建线程
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            // 线程名
                            t.setName("rabbitmq_client_async_sender");
                            return t;
                        }
                    },
                    // 被拒绝时的处理逻辑
                    new java.util.concurrent.RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            log.error("async sender is error rejected, runnable: {}, executor: {}", r, executor);
                        }
                    });
        // 提供对外接口
        public static void submit(Runnable runnable) {
            senderAsync.submit(runnable);
        }    
            
}

通过 @Autowired注入方式实例化的RabbitTemplate是单例,为了提高效率,可以池化处理RabbitTemplate:

  • 一个topic对应一个rabbitTemplate
  • 第一次是创建一个template,等后来topic对应的template存在时就只用从池子中获取;
  • 这也相当于多生产者,并行发送消息,一个topic对应一个生产者,多个topic就对应多个生产者,相比于单例中的单个生产者的效率有所提升。

RabbitTemplateContainer.java

/**
 *     $RabbitTemplateContainer池化封装
 *     每一个topic 对应一个RabbitTemplate
 *    1.    提高发送的效率
 *     2.     可以根据不同的需求制定化不同的RabbitTemplate, 比如每一个topic 都有自己的routingKey规则
 */
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {

    private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
    
    private Splitter splitter = Splitter.on("#");
    
    private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
    
    @Autowired
    private ConnectionFactory connectionFactory;
    
    @Autowired
    private MessageStoreService messageStoreService;
    
    public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
        Preconditions.checkNotNull(message);
        String topic = message.getTopic();
        RabbitTemplate rabbitTemplate = rabbitMap.get(topic);
        // 池中存在就立即返回,否则就新建一个 newTemplate 
        if(rabbitTemplate != null) {
            return rabbitTemplate;
        }
        log.info("#RabbitTemplateContainer.getTemplate# topic: {} is not exists, create one", topic);
        
        RabbitTemplate newTemplate = new RabbitTemplate(connectionFactory);
        newTemplate.setExchange(topic);
        newTemplate.setRoutingKey(message.getRoutingKey());
        newTemplate.setRetryTemplate(new RetryTemplate());
        
        //    添加序列化反序列化和converter对象
        Serializer serializer = serializerFactory.create();
        GenericMessageConverter gmc = new GenericMessageConverter(serializer);
        RabbitMessageConverter rmc = new RabbitMessageConverter(gmc);
        newTemplate.setMessageConverter(rmc);
        
        String messageType = message.getMessageType();
        // 不是迅速发消息类型的时候,都需要添加消息确认方法:confirm()
        if(!MessageType.RAPID.equals(messageType)) {
            newTemplate.setConfirmCallback(this);
        }
        
        rabbitMap.putIfAbsent(topic, newTemplate);
        
        return rabbitMap.get(topic);
    }

    /**
     *     无论是 confirm 消息 还是 reliant 消息 ,发送消息以后 broker都会去回调confirm
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //     具体的消息应答
        List<String> strings = splitter.splitToList(correlationData.getId());
        String messageId = strings.get(0);
        long sendTime = Long.parseLong(strings.get(1));
        String messageType = strings.get(2);
        if(ack) {
            //    当Broker 返回ACK成功时
            log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
        } else {
            log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);
            
        }
    }
}

对于上面代码中的

    //    添加序列化反序列化和converter对象
     Serializer serializer = serializerFactory.create();
     GenericMessageConverter gmc = new GenericMessageConverter(serializer);
     RabbitMessageConverter rmc = new RabbitMessageConverter(gmc);
     newTemplate.setMessageConverter(rmc);

这块代码是一个设置对象转换方式的操作,这里的转换是指我们自己写的Message实体类和org.springframework.amqp.core.Message之间的转换。并且转换过程可以通过自定义的序列化类进行实现(自定义序列化的意义在于实现指定对象之间的相互转化)。

具体实现可参考:自定义某个Object对象和Spring中的某个Object对象通过序列化和反序列化的方式进行转换(java代码实现)

序列化方法全都定义在 rabbit-common 模块下,其中有:SerializerFactory.java、Serializer.java、 JacksonSerializer.java、JacksonSerializerFactory.java、GenericMessageConverter.java、RabbitMessageConverter.java六个类。

此外,由于继承了RabbitTemplate.ConfirmCallback类,所以可以重写回调函数,即confirm(CorrelationData correlationData, boolean ack, String cause)方法,在里面添加自己的处理逻辑。

2. 确认类型消息的逻辑实现(放于 rabbit-core-producer 模块下)

基于上面已完成的代码,可以很容易实现** 确认类型消息** 的发送,直接可以引用前面实现的 sendKernel(message) 方法进行发送,并且 **回调函数 confirm()**已经在前面实现了,这里就不再说明了。

/**
 *     $RabbitBrokerImpl 真正的发送不同类型的消息实现类
 *
 */
@Slf4j
@Component
public class RabbitBrokerImpl implements RabbitBroker {

    @Autowired
    private RabbitTemplateContainer rabbitTemplateContainer;
    
    @Autowired
    private MessageStoreService messageStoreService;
    
    @Override
    public void rapidSend(Message message) {
        // 省略......
    }

    /**
     *     确认类型的消息发送
     */
    @Override
    public void confirmSend(Message message) {
        message.setMessageType(MessageType.CONFIRM);
        sendKernel(message);
    }

    /**
     *     $sendKernel 发送消息的核心方法 使用异步线程池进行发送消息
     * @param message
     */
    private void sendKernel(Message message) {
        AsyncBaseQueue.submit((Runnable) () -> {
            CorrelationData correlationData =
                    // 回调函数confirm中需要用到message.getMessageId(), message.getMessageType()。所以可以放在CorrelationData中
                    new CorrelationData(String.format("%s#%s#%s",
                            message.getMessageId(),
                            System.currentTimeMillis(),
                            message.getMessageType()));
            String topic = message.getTopic();
            String routingKey = message.getRoutingKey();
            RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
            rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
            log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());            
        });
    }

    @Override
    public void reliantSend(Message message) {
        
    }
    
    @Override
    public void sendMessages() {
        
    }

}

3. 对可靠性发送消息的实现(放于 rabbit-core-producer 模块下)

可靠性发送消息的实现可参考教程: RabbitMQ可靠性消息发送(java实现)

五、 测试

1. 新建一个工程项目 rabbit-test,并添加我们封装好的基础组件rabbit-core-producer:

          <dependency>
              <groupId>com.bfxy.base.rabbit</groupId>
              <artifactId>rabbit-core-producer</artifactId>
              <version>0.0.1-SNAPSHOT</version>
          </dependency>

2. 配置文件 application.properties

server.context-path=/test
server.port=8001

spring.application.name=test

# MQ的地址
spring.rabbitmq.addresses=192.168.11.71:5672, 192.168.11.72:5672, 192.168.11.73:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.auto-startup=false

## 封装了 ElasticJob 的基础组件 rabbit-task 需要下面这两个 ZooKeeper 配置
elastic.job.zk.serverLists=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
elastic.job.zk.namespace=elastic-job

3. RabbitMQ和ZooKeeper启动

将RabbitMQ集群和ZooKeeper集群都启动,并在RabbitMQ服务中心(即 ​​​​​​​121.43.153.20:15672界面)手动创建交换机 exchange-1 和队列 queue-1。并通过routingKey:springboot.* 进行绑定。

4. 编写测试类:

ApplicationTests.java

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private ProducerClient producerClient;
    
    @Test
    public void testProducerClient() throws Exception {
        
        for(int i = 0 ; i < 1; i ++) {
            String uniqueId = UUID.randomUUID().toString();
            Map<String, Object> attributes = new HashMap<>();
            attributes.put("name", "张三");
            attributes.put("age", "18");
            Message message = new Message(
                    uniqueId, 
                    "exchange-1", 
                    "springboot.abc", 
                    attributes, 
                    0);
            message.setMessageType(MessageType.RELIANT);
//            message.setDelayMills(15000);
            producerClient.send(message);            
        }

        Thread.sleep(100000);
    }
    
    
}

然后也可以测试一下发送失败的场景:使用一个不存在的交换机进行发送,就会发送失败,比如将上面的交换机名称修改成 exchange-2,然后重新执行发送即可。并查看 数据库中表 broker_message 的状态 status 的值,是成功还是失败。

(补充)六、 批量消息发送和延迟消息发送的实现

1. 批量消息发送

(1)工具类编写

这里对于批量发送的消息类型默认为 迅速消息类型(MessageType.RAPID).

为了在使用send(List<Message> messages)批量发消息之前,可以在该链路请求过程中任何时候添加要发送的消息,然后打包成一个请求执行批量发送。因而使用线程变量 ThreadLocal 来暂存Message,等到执行send(List<Message> messages)时,再从ThreadLocal中取出Message,从而实现一次性批量发送。

ThreadLocal是一个类似于hashmap的二级map,与当前线程是一个弱引用关系,这里借助ThreadLocal实现消息的缓存,对其进行简单封装一下:

MessageHolder.java

public class MessageHolder {

    private List<Message> messages = Lists.newArrayList();
    
    @SuppressWarnings({"rawtypes", "unchecked"})
    public static final ThreadLocal<MessageHolder> holder = new ThreadLocal() {
        @Override
        protected Object initialValue() {
            return new MessageHolder();
        }
    };

    /**
     *
     * @param message
     */
    public static void add(Message message) {
        holder.get().messages.add(message);
    }

    /**
     * 从 ThreadLocal 取出数据,并清空 ThreadLocal
     * @return
     */
    public static List<Message> clear() {
        List<Message> tmp = Lists.newArrayList(holder.get().messages);
        holder.remove();
        return tmp;
    }
    
}

这里对于发送消息也是使用异步线程去发送,为了和之前单个消息发送的异步线程队列区分开,这里再创建一个异步线程队列,但是其中的逻辑和之前的线程队列 AsyncBaseQueue 一样的,只是换了一个队列名称而已。新的线程队列:

MessageHolderAyncQueue.java

@Slf4j
public class MessageHolderAyncQueue {

    private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();
    
    private static final int QUEUE_SIZE = 10000;
    
    private static ExecutorService senderAsync =
            new ThreadPoolExecutor(THREAD_SIZE,
                    THREAD_SIZE,
                    60L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(QUEUE_SIZE),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("rabbitmq_client_async_sender");
                            return t;
                        }
                    },
                    new java.util.concurrent.RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            log.error("async sender is error rejected, runnable: {}, executor: {}", r, executor);
                        }
                    });
            
        public static void submit(Runnable runnable) {
            senderAsync.submit(runnable);
        }    
}

(2)批量发送实现

现在开始编写发送的逻辑:首先是改造最外层的发送方法,即在 ProducerClient.java 中加入批量发送代码:

@Component
public class ProducerClient implements MessageProducer {

    @Autowired
    private RabbitBroker rabbitBroker;
    
    @Override
    public void send(Message message) throws MessageRunTimeException {
        // 省略...
    }

    /**
     *     批量发送消息
     */
    @Override
    public void send(List<Message> messages) throws MessageRunTimeException {
        messages.forEach( message -> {
            message.setMessageType(MessageType.RAPID);
            // 往 ThreadLocal 中暂存消息
            MessageHolder.add(message);
        });
        rabbitBroker.sendMessages();
    }
    
    @Override
    public void send(Message message, SendCallback sendCallback) throws MessageRunTimeException {
        // TODO Auto-generated method stub
        
    }

}

然后是实现 rabbitBroker.sendMessages() 方法:

RabbitBrokerImpl.java

/**
 *     $RabbitBrokerImpl 真正的发送不同类型的消息实现类
 * @author Alienware
 *
 */
@Slf4j
@Component
public class RabbitBrokerImpl implements RabbitBroker {

    @Autowired
    private RabbitTemplateContainer rabbitTemplateContainer;
    
    @Autowired
    private MessageStoreService messageStoreService;
    
    @Override
    public void reliantSend(Message message) {
        // 省略...
    }
    
    /**
     *     $rapidSend迅速发消息
     */
    @Override
    public void rapidSend(Message message) {
        // 省略...
    }
    
    /**
     *     $sendKernel 发送消息的核心方法 使用异步线程池进行发送消息
     * @param message
     */
    private void sendKernel(Message message) {
        // 省略...
    }

    @Override
    public void confirmSend(Message message) {
        // 省略...
    }

    @Override
    public void sendMessages() {
        // 从 ThreadLocal 中取出全部消息
        List<Message> messages = MessageHolder.clear();
        // 异步线程循环发送
        messages.forEach(message -> {
            MessageHolderAyncQueue.submit((Runnable) () -> {
                CorrelationData correlationData = 
                        new CorrelationData(String.format("%s#%s#%s",
                                message.getMessageId(),
                                System.currentTimeMillis(),
                                message.getMessageType()));
                String topic = message.getTopic();
                String routingKey = message.getRoutingKey();
                RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
                rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
                log.info("#RabbitBrokerImpl.sendMessages# send to rabbitmq, messageId: {}", message.getMessageId());            
            });            
        });
    }

}

2. 延迟消息发送

首先需要安装一个延迟发送的插件,教程可参考:RabbitMQ集群环境搭建-镜像模式 中的第三步骤。

代码实现:

(1)发送消息时指定延迟时长,这里设置延迟15秒钟:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private ProducerClient producerClient;
    
    @Test
    public void testProducerClient2() throws Exception {
        
        for(int i = 0 ; i < 1; i ++) {
            String uniqueId = UUID.randomUUID().toString();
            Map<String, Object> attributes = new HashMap<>();
            attributes.put("name", "张三");
            attributes.put("age", "18");
            Message message = new Message(
                    uniqueId, 
                    "delay-exchange", 
                    "delay.abc", 
                    attributes, 
                    15000); // 延迟15秒钟
            message.setMessageType(MessageType.RELIANT);
            producerClient.send(message);            
        }

        Thread.sleep(100000);
    }
}

(2)添加延迟属性

在我们自定义的Message对象和Spring中的org.springframework.amqp.core.Message对象转换的过程中,将这个延时时长添加到附加属性 Properties 中,如下代码:

自定义的Message对象和Spring中的org.springframework.amqp.core.Message对象转换的逻辑可参考教程:自定义某个Object对象和Spring中的某个Object对象通过序列化和反序列化的方式进行转换

RabbitMessageConverter.java

public class RabbitMessageConverter implements MessageConverter {

    private GenericMessageConverter delegate;
    
    public RabbitMessageConverter(GenericMessageConverter genericMessageConverter) {
        // 省略...
    }
    
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        com.bfxy.rabbit.api.Message message = (com.bfxy.rabbit.api.Message)object;
        // 延迟消息:把延迟时长作为 x-delay 的属性值,添加到 messageProperties 中
        messageProperties.setDelay(message.getDelayMills());
        return this.delegate.toMessage(object, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        // 省略...
    }

}

把延迟时长作为 x-delay 的属性值,添加到 messageProperties 中,这样就实现了延迟消息的发送。


本文转载自: https://blog.csdn.net/Qynwang/article/details/130477137
版权归原作者 Java知者 所有, 如有侵权,请联系我们删除。

“RabbitMQ基础组件封装—整体结构(总篇)”的评论:

还没有评论