0


RocketMQ 事务消息示例分析

文章目录

1 示例模式

RocketMQ 事务消息示例包含一个生产者、消费者、NameServer 以及 Broker 服务,它们之间的关系如下:

RocketMQ架构上主要分为四部分[^1]:

  1. Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  2. Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  3. NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
  4. BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证。

2 安装与配置 RocketMQ

(1)下载解压

到 RocketMQ 官网下载二进制版本包,解压到磁盘下。

(2)JVM 版本

注意:openjdk11 不能运行(会抛出Error: Could not create the Java Virtual Machine.),jdk8 可以。

如果cmd 命令一闪而过,可以在命令代码的末尾加入 pause,以观察出错日志。

(3)配置环境变量

配置 ROCKETMQ_HOME 与 NAMESEV_ADDR 环境变量。其中 ROCKETMQ_HOME 是 RocketMQ 的解压后的路径,NAMESEV_ADDR 是 NameServer 的访问地址。

(4)在应用工程的 pom 中配置

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></dependency>

3 运行服务

使用 powershell 分别启动 name server与 broker 服务。

3.1 启动 NameServer

启动 name server:

cd C:\programs\rocketmq-4.9.2\bin

.\mqnamesrv.cmd

输出:

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

3.2 启动 broker

启动 broker:

cd C:\programs\rocketmq-4.9.2\bin

.\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

-n 指定 NameServer 地址,autoCreateTopicEnable 为 true 表示自动创建主题。

输出:

The broker[DENIRO-LEE, 192.168.17.1:10911] boot success. serializeType=JSON and name server is localhost:9876

4 生产者

4.1 事务监听器

事务监听器需要实现两个方法,其中的 executeLocalTransaction 方法用于执行本地事务,checkLocalTransaction 方法用于 broker 回查本地事务的状态。具体逻辑如下:

  1. 定义了一个原子整型计数器 transactionIndex,用于循环事务状态。
  2. 定义了一个线程安全 ConcurrentHashMap,名为 localTrans,用于存放事务 ID与状态码。
  3. 在 executeLocalTransaction 方法中,递增 transactionIndex,然后除以 3,求得余数。因为事务只有三种状态:UNKNOW、ROLLBACK_MESSAGE 和 COMMIT_MESSAGE,所以这里除数为 3。然后返回 UNKNOW 状态,让 broker 回查这批消息的事务状态。
  4. 在 checkLocalTransaction 方法中,会传入由 broker 给出的事务 ID。然后依据这个事务 ID 从 localTrans 中取出这个事务 ID 的状态码。接着依据不同的状态码,打印日志并返回对应的事务状态。
import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.atomic.AtomicInteger;publicclassTransactionListenerImplimplementsTransactionListener{private AtomicInteger transactionIndex =newAtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans =newConcurrentHashMap<>();@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg){int value = transactionIndex.getAndIncrement();int status = value %3;
        localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg){
        String transactionId = msg.getTransactionId();
        Integer status = localTrans.get(transactionId);if(null != status){switch(status){case0:

                    System.out.printf("%s%s%n","事务ID -> "+transactionId," 返回 UNKNOW");return LocalTransactionState.UNKNOW;case1:
                    System.out.printf("%s%s%n","事务ID -> "+transactionId,"返回 COMMIT_MESSAGE");return LocalTransactionState.COMMIT_MESSAGE;case2:
                    System.out.printf("%s%s%n","事务ID -> "+transactionId,"返回 ROLLBACK_MESSAGE");return LocalTransactionState.ROLLBACK_MESSAGE;default:
                    System.out.printf("%s%s%n","事务ID -> "+transactionId,"返回 UNKNOW");return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}}

4.2 事务消息生产者

该生产者会产生 10 条事务消息。具体逻辑如下:

  1. 创建事务监听器 transactionListener。
  2. 创建事务消息生产者 producer。
  3. 生产者设置 NameServer的地址。
  4. 创建线程池执行器 ThreadPoolExecutor。
  5. 生产者设置线程池执行器。
  6. 生产者设置事务监听器。
  7. 启动生产者。
  8. 创建多个标签。
  9. 循环生成 10 条事务消息。每个消息创建后,使用生产者以事务的方式发送该消息,打印并休眠 10 ms。
  10. 线程休眠一段时间。用于响应 broker 的回查请求。
  11. 关闭生产者。
package com.deniro.rocketmq.transaction;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.example.transaction.TransactionListenerImpl;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;import java.util.concurrent.*;/**
 * 生产事务消息的生产者
 *
 * @author Deniro Lee
 */publicclassTransactionProducer{publicstaticvoidmain(String[] args)throws MQClientException, InterruptedException {
        TransactionListener transactionListener =newTransactionListenerImpl();
        TransactionMQProducer producer =newTransactionMQProducer("deniro_transaction_message_group");// 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");

        ExecutorService executorService =newThreadPoolExecutor(2,5,100, TimeUnit.SECONDS,newArrayBlockingQueue<Runnable>(2000),newThreadFactory(){@Overridepublic Thread newThread(Runnable r){
                Thread thread =newThread(r);
                thread.setName("client-transaction-msg-check-thread");return thread;}});
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        String[] tags =newString[]{"TagA","TagB","TagC","TagD","TagE"};for(int i =0; i <10; i++){try{
                Message msg =newMessage("transactionMsg", tags[i % tags.length],"KEY"+ i,("Hello RocketMQ "+ i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n","第"+(i+1)+"条 ->"+ sendResult);
                Thread.sleep(10);}catch(MQClientException| UnsupportedEncodingException e){
                e.printStackTrace();}}for(int i =0; i <100000; i++){
            Thread.sleep(1000);}
        producer.shutdown();}}

5 消费者

消费者只能获取事务状态为 COMMIT_MESSAGE 的消息。具体逻辑如下:

  1. 创建消费者。
  2. 消费者设置NameServer的地址。
  3. 消费者订阅Topic。
  4. 消费者注册回调实现类来处理从broker拉取回来的消息。回调实现类的方法返回消息已经被成功消费的状态。
  5. 启动消费者。
package com.deniro.rocketmq.base;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/**
 * 消费者
 *
 * @author Deniro Lee
 */publicclassConsumer{publicstaticvoidmain(String[] args)throws InterruptedException, MQClientException {// 实例化消费者
        DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("deniro_transaction_message_group");// 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("transactionMsg","*");// 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(newMessageListenerConcurrently(){@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");}}

6 测试

生产者端发送 10 条事务消息:

第1条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF410000, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=432]
第2条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF550001, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=433]
第3条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF640002, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=3], queueOffset=434]
第4条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF740003, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=0], queueOffset=435]
第5条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF840004, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=436]
第6条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF930005, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=437]
第7条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFA20006, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=3], queueOffset=438]
第8条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFB20007, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=0], queueOffset=439]
第9条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFC20008, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=440]
第10条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFD10009, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=441]

事务监听器对 3 取余数,依据余数的值来返回事务状态。
余数值说明0返回 UNKNOW。表示未知,这样 broker 会不断回查生产者的事务状态(限制为 15 次,可在 broker 端配置,超过限制则丢弃该消息并记录日志)。1返回 ROLLBACK_MESSAGE。表示事务回滚,这样消费者端就不会收到这条消息。2返回 COMMIT_MESSAGE。表示事务提交,这样消费者端就会收到这条消息,执行本地事务操作。
生产者端事务监听器输出:

事务ID -> 7F000001771418B4AAC24166CF410000 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF640002返回 ROLLBACK_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF550001返回 COMMIT_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF740003 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF840004返回 COMMIT_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF930005返回 ROLLBACK_MESSAGE
事务ID -> 7F000001771418B4AAC24166CFA20006 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFB20007返回 COMMIT_MESSAGE
事务ID -> 7F000001771418B4AAC24166CFD10009 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFC20008返回 ROLLBACK_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF410000 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF740003 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFA20006 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFD10009 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF410000 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF740003 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFA20006 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFD10009 返回 UNKNOW

消费者端只会收到状态为 COMMIT_MESSAGE 的消息:


通过 RocketMQ 可以实现可靠消息最终一致性,适用于执行周期长且实时性要求不高的场景。优点是避免了分布式事务中的同步阻塞的影响,并实现了两个服务的解耦。不足是如果消费者端事务异常回滚,生产者端不会回滚。而且消息可能会被重复消费,因此需要在消费者端进行冥等处理。


本文转载自: https://blog.csdn.net/deniro_li/article/details/121932600
版权归原作者 deniro_li 所有, 如有侵权,请联系我们删除。

“RocketMQ 事务消息示例分析”的评论:

还没有评论