文章目录
1 示例模式
RocketMQ 事务消息示例包含一个生产者、消费者、NameServer 以及 Broker 服务,它们之间的关系如下:
RocketMQ架构上主要分为四部分[^1]:
- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
- BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证。
2 安装与配置 RocketMQ
(1)下载解压
到 RocketMQ 官网下载二进制版本包,解压到磁盘下。
(2)JVM 版本
注意:openjdk11 不能运行(会抛出Error: Could not create the Java Virtual Machine.),jdk8 可以。
如果cmd 命令一闪而过,可以在命令代码的末尾加入 pause,以观察出错日志。
(3)配置环境变量
配置 ROCKETMQ_HOME 与 NAMESEV_ADDR 环境变量。其中 ROCKETMQ_HOME 是 RocketMQ 的解压后的路径,NAMESEV_ADDR 是 NameServer 的访问地址。
(4)在应用工程的 pom 中配置
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></dependency>
3 运行服务
使用 powershell 分别启动 name server与 broker 服务。
3.1 启动 NameServer
启动 name server:
cd C:\programs\rocketmq-4.9.2\bin
.\mqnamesrv.cmd
输出:
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
3.2 启动 broker
启动 broker:
cd C:\programs\rocketmq-4.9.2\bin
.\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
-n 指定 NameServer 地址,autoCreateTopicEnable 为 true 表示自动创建主题。
输出:
The broker[DENIRO-LEE, 192.168.17.1:10911] boot success. serializeType=JSON and name server is localhost:9876
4 生产者
4.1 事务监听器
事务监听器需要实现两个方法,其中的 executeLocalTransaction 方法用于执行本地事务,checkLocalTransaction 方法用于 broker 回查本地事务的状态。具体逻辑如下:
- 定义了一个原子整型计数器 transactionIndex,用于循环事务状态。
- 定义了一个线程安全 ConcurrentHashMap,名为 localTrans,用于存放事务 ID与状态码。
- 在 executeLocalTransaction 方法中,递增 transactionIndex,然后除以 3,求得余数。因为事务只有三种状态:UNKNOW、ROLLBACK_MESSAGE 和 COMMIT_MESSAGE,所以这里除数为 3。然后返回 UNKNOW 状态,让 broker 回查这批消息的事务状态。
- 在 checkLocalTransaction 方法中,会传入由 broker 给出的事务 ID。然后依据这个事务 ID 从 localTrans 中取出这个事务 ID 的状态码。接着依据不同的状态码,打印日志并返回对应的事务状态。
import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.atomic.AtomicInteger;publicclassTransactionListenerImplimplementsTransactionListener{private AtomicInteger transactionIndex =newAtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans =newConcurrentHashMap<>();@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg){int value = transactionIndex.getAndIncrement();int status = value %3;
localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg){
String transactionId = msg.getTransactionId();
Integer status = localTrans.get(transactionId);if(null != status){switch(status){case0:
System.out.printf("%s%s%n","事务ID -> "+transactionId," 返回 UNKNOW");return LocalTransactionState.UNKNOW;case1:
System.out.printf("%s%s%n","事务ID -> "+transactionId,"返回 COMMIT_MESSAGE");return LocalTransactionState.COMMIT_MESSAGE;case2:
System.out.printf("%s%s%n","事务ID -> "+transactionId,"返回 ROLLBACK_MESSAGE");return LocalTransactionState.ROLLBACK_MESSAGE;default:
System.out.printf("%s%s%n","事务ID -> "+transactionId,"返回 UNKNOW");return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}}
4.2 事务消息生产者
该生产者会产生 10 条事务消息。具体逻辑如下:
- 创建事务监听器 transactionListener。
- 创建事务消息生产者 producer。
- 生产者设置 NameServer的地址。
- 创建线程池执行器 ThreadPoolExecutor。
- 生产者设置线程池执行器。
- 生产者设置事务监听器。
- 启动生产者。
- 创建多个标签。
- 循环生成 10 条事务消息。每个消息创建后,使用生产者以事务的方式发送该消息,打印并休眠 10 ms。
- 线程休眠一段时间。用于响应 broker 的回查请求。
- 关闭生产者。
package com.deniro.rocketmq.transaction;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.example.transaction.TransactionListenerImpl;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;import java.util.concurrent.*;/**
* 生产事务消息的生产者
*
* @author Deniro Lee
*/publicclassTransactionProducer{publicstaticvoidmain(String[] args)throws MQClientException, InterruptedException {
TransactionListener transactionListener =newTransactionListenerImpl();
TransactionMQProducer producer =newTransactionMQProducer("deniro_transaction_message_group");// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
ExecutorService executorService =newThreadPoolExecutor(2,5,100, TimeUnit.SECONDS,newArrayBlockingQueue<Runnable>(2000),newThreadFactory(){@Overridepublic Thread newThread(Runnable r){
Thread thread =newThread(r);
thread.setName("client-transaction-msg-check-thread");return thread;}});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags =newString[]{"TagA","TagB","TagC","TagD","TagE"};for(int i =0; i <10; i++){try{
Message msg =newMessage("transactionMsg", tags[i % tags.length],"KEY"+ i,("Hello RocketMQ "+ i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n","第"+(i+1)+"条 ->"+ sendResult);
Thread.sleep(10);}catch(MQClientException| UnsupportedEncodingException e){
e.printStackTrace();}}for(int i =0; i <100000; i++){
Thread.sleep(1000);}
producer.shutdown();}}
5 消费者
消费者只能获取事务状态为 COMMIT_MESSAGE 的消息。具体逻辑如下:
- 创建消费者。
- 消费者设置NameServer的地址。
- 消费者订阅Topic。
- 消费者注册回调实现类来处理从broker拉取回来的消息。回调实现类的方法返回消息已经被成功消费的状态。
- 启动消费者。
package com.deniro.rocketmq.base;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/**
* 消费者
*
* @author Deniro Lee
*/publicclassConsumer{publicstaticvoidmain(String[] args)throws InterruptedException, MQClientException {// 实例化消费者
DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("deniro_transaction_message_group");// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("transactionMsg","*");// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(newMessageListenerConcurrently(){@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");}}
6 测试
生产者端发送 10 条事务消息:
第1条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF410000, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=432]
第2条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF550001, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=433]
第3条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF640002, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=3], queueOffset=434]
第4条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF740003, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=0], queueOffset=435]
第5条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF840004, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=436]
第6条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF930005, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=437]
第7条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFA20006, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=3], queueOffset=438]
第8条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFB20007, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=0], queueOffset=439]
第9条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFC20008, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=440]
第10条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFD10009, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=441]
事务监听器对 3 取余数,依据余数的值来返回事务状态。
余数值说明0返回 UNKNOW。表示未知,这样 broker 会不断回查生产者的事务状态(限制为 15 次,可在 broker 端配置,超过限制则丢弃该消息并记录日志)。1返回 ROLLBACK_MESSAGE。表示事务回滚,这样消费者端就不会收到这条消息。2返回 COMMIT_MESSAGE。表示事务提交,这样消费者端就会收到这条消息,执行本地事务操作。
生产者端事务监听器输出:
事务ID -> 7F000001771418B4AAC24166CF410000 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF640002返回 ROLLBACK_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF550001返回 COMMIT_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF740003 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF840004返回 COMMIT_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF930005返回 ROLLBACK_MESSAGE
事务ID -> 7F000001771418B4AAC24166CFA20006 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFB20007返回 COMMIT_MESSAGE
事务ID -> 7F000001771418B4AAC24166CFD10009 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFC20008返回 ROLLBACK_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF410000 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF740003 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFA20006 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFD10009 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF410000 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF740003 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFA20006 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFD10009 返回 UNKNOW
消费者端只会收到状态为 COMMIT_MESSAGE 的消息:
通过 RocketMQ 可以实现可靠消息最终一致性,适用于执行周期长且实时性要求不高的场景。优点是避免了分布式事务中的同步阻塞的影响,并实现了两个服务的解耦。不足是如果消费者端事务异常回滚,生产者端不会回滚。而且消息可能会被重复消费,因此需要在消费者端进行冥等处理。
版权归原作者 deniro_li 所有, 如有侵权,请联系我们删除。