RocketMQ的介绍
RocketMQ版本发展
Metaq1.x是RocketMQ前身的第一个版本,本质上把Kafka做了一次java版本的重写(Kafka是scala语言开发)。
Meta2.x,主要是对存储部分进行了优化,因为kafka的数据存储,它的partition是一个全量的复制,在阿里、在淘宝的这种海量交易。Kafka这种机制的横向拓展是非常不好的。
2012年阿里同时把Meta2.0从阿里内部开源出来,取名RocketMQ,同时为了命名上的规范(版本上延续),所以这个就是RocketMQ3.0。
现在RocketMQ主要维护的是4.x的版本,也是大家使用得最多的版本,2017年从Apache顶级项目毕业。
阿里内部项目的使用
那么在阿里公司内部,原则上遵守开源共建原则。RocketMQ项目只维护核心功能,且去除了所有其他运行时依赖,核心功能最简化。每个BU(Business Unit业务单元)的个性化需求都在RocketMQ项目之上进行深度定制。RocketMQ向其他BU提供的仅仅是Jar包,例如要定制一个Broker,那么只需要依赖rocketmq-broker这jar包即可,可通过API进行交互, 如果定制client,则依赖rocketmq-client这个jar包,对其提供的api进行再封装。
在RocketMQ项目基础上几个常用的项目如下:
- com.taobao.metaq v3.0 = RocketMQ + 淘宝个性化需求:为淘宝应用提供消息服务
- com.alipay.zpullmsg v1.0 = RocketMQ + 支付宝个性化需求:为支付宝应用提供消息服务
- com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B个性化需求:为B2B应用提供消息服务
展望未来
从阿里负责RocketMQ的架构核心人员的信息来看,阿里内部一直全力拓展RocketMQ。
2017年10月份,OpenMessaging项目由阿里巴巴发起,与雅虎、滴滴出行、Streamlio公司共同参与创立, 项目意在创立厂商无关、平台无关的分布式消息及流处理领域的应用开发标准。同时OpenMessaging入驻Linux基金会。
OpenMessaging项目已经开始在Apache RocketMQ中率先落地,并推广至整个阿里云平台.
另外RocketMQ5的版本也在内部推进,主要的方向是Cloud Native(云原生)。
另外还要讲一下Apache RocketMQ的商业版本,Aliware MQ在微服务、流计算、IoT、异步解耦、数据同步等场景有非常广泛的运用。
RocketMQ的物理架构
消息队列RocketMQ是阿里巴巴集团基于高可用分布式集群技术,自主研发的云正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双11使用的核心产品。
RocketMQ的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。
NameServer
NameServer是整个RocketMQ的“大脑”,它是RocketMQ的服务注册中心,所以RocketMQ需要先启动NameServer再启动Broker。
Broker在启动时向所有NameServer注册(主要是服务器地址等),生产者在发送消息之前先从NameServer获取Broker服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。
NameServer与每台Broker服务保持长连接,并间隔30S检查Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。这样就可以实现RocketMQ的高可用。
生产者(Producer)
生产者:也称为消息发布者,负责生产并发送消息至RocketMQ。
消费者(Consumer)
消费者:也称为消息订阅者,负责从RocketMQ接收并消费消息。
消息(Message)
消息:生产或消费的数据,对于RocketMQ来说,消息就是字节数组。
主机(Broker)
RocketMQ的核心,用于暂存和传输消息。
物理架构中的整体运转
- NameServer先启动
- Broker启动时向NameServer注册
- 生产者在发送某个主题的消息之前先从NamerServer获取Broker服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker进行消息发送。
- NameServer与每台Broker服务器保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机(使用心跳机制,如果检测超过120S),则从路由注册表中将其移除。
- 消费者在订阅某个主题的消息之前从NamerServer获取Broker服务器地址列表(有可能是集群),但是消费者选择从Broker中订阅消息,订阅规则由Broker配置决定。
RocketMQ的概念模型
分组(Group)
生产者:标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。主要作用用于事务消息,事务消息中如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其它producer,确认这条消息应该commit还是rollback。
消费者:标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。
消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。
主题(Topic)
标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。
区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息。
标签(Tag)
RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic的时候,如果你消费订阅的时候指定的是tagA,那么tagB的消息将不会投递。
消息队列(Message Queue)
简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若一个Topic创建在不同的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。
无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。
每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。
偏移量(Offset)
RocketMQ中,有很多offset的概念。一般我们只关心暴露到客户端的offset。不指定的话,就是指Message Queue下面的offset。
Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset,Message queue中的max offset表示消息的最大offset。
Consumer offset可以理解为标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息offset+1,即实际上表示的是下次拉取的offset位置。
普通消息的发送与消费
消息发送
发送同步消息
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
packagecom.morris.rocketmq.simple;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;importstaticcom.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;importstaticcom.morris.rocketmq.util.Contant.PRODUCER_GROUP;/**
* 同步发送消息
*/publicclassSynProducer{publicstaticvoidmain(String[] args)throwsException{//Instantiate with a producer group name.DefaultMQProducer producer =newDefaultMQProducer(PRODUCER_GROUP);// Specify name server addresses.
producer.setNamesrvAddr(NAME_SERVER_ADDRESS);
producer.setSendMsgTimeout(6000);//Launch the instance.
producer.start();for(int i =0; i <100; i++){//Create a message instance, specifying topic, tag and message body.Message msg =newMessage("TopicTest"/* Topic */,"TagA"/* Tag */,("Hello RocketMQ "+
i).getBytes(RemotingHelper.DEFAULT_CHARSET)/* Message body */);//Call send message to deliver message to one of brokers.SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//Shut down once the producer instance is not longer in use.
producer.shutdown();}}
运行结果如下:
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2DFEB0000, offsetMsgId=AC12FA2E00002A9F0000000000012E3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=1], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0030001, offsetMsgId=AC12FA2E00002A9F0000000000012EF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=2], queueOffset=101]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0060002, offsetMsgId=AC12FA2E00002A9F0000000000012FB6, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=3], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0090003, offsetMsgId=AC12FA2E00002A9F0000000000013074, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=0], queueOffset=99]... ...
Message ID:消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条消息。
SendStatus:发送的标识。成功,失败等
Queue:相当于是Topic的分区;用于并行发送和接收消息
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。
packagecom.morris.rocketmq.simple;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.client.producer.SendCallback;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.TimeUnit;importstaticcom.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;importstaticcom.morris.rocketmq.util.Contant.PRODUCER_GROUP;/**
* 异步发送消息
*/publicclassAsyncProducer{publicstaticvoidmain(String[] args)throwsException{//Instantiate with a producer group name.DefaultMQProducer producer =newDefaultMQProducer(PRODUCER_GROUP);// Specify name server addresses.
producer.setNamesrvAddr(NAME_SERVER_ADDRESS);//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount =100;finalCountDownLatch countDownLatch =newCountDownLatch(messageCount);for(int i =0; i < messageCount; i++){try{finalint index = i;Message msg =newMessage("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg,newSendCallback(){@OverridepublicvoidonSuccess(SendResult sendResult){
countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@OverridepublicvoidonException(Throwable e){
countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();}});}catch(Exception e){
e.printStackTrace();}}
countDownLatch.await(5,TimeUnit.SECONDS);
producer.shutdown();}}
单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
packagecom.morris.rocketmq.simple;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;importstaticcom.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;importstaticcom.morris.rocketmq.util.Contant.PRODUCER_GROUP;/**
* 单向发送消息
*/publicclassOnewayProducer{publicstaticvoidmain(String[] args)throwsException{//Instantiate with a producer group name.DefaultMQProducer producer =newDefaultMQProducer(PRODUCER_GROUP);// Specify name server addresses.
producer.setNamesrvAddr(NAME_SERVER_ADDRESS);//Launch the instance.
producer.start();for(int i =0; i <100; i++){//Create a message instance, specifying topic, tag and message body.Message msg =newMessage("TopicTest"/* Topic */,"TagA"/* Tag */,("Hello RocketMQ "+
i).getBytes(RemotingHelper.DEFAULT_CHARSET)/* Message body */);//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);}//Wait for sending to completeThread.sleep(5000);
producer.shutdown();}}
消息发送时的权衡
发送方式发送TPS发送结果反馈可靠性使用场景同步发送快有可靠邮件、短信、推送异步发送快有可靠视频转码单向发送最快无可能丢失日志收集
消息消费
集群消费
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。
实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。
而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。
这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker。
packagecom.morris.rocketmq.simple;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.common.protocol.heartbeat.MessageModel;importstaticcom.morris.rocketmq.util.Contant.CONSUMER_GROUP;importstaticcom.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;/**
* 集群消费消息(默认)
*/publicclassClusterConsumer{publicstaticvoidmain(String[] args)throwsInterruptedException,MQClientException{// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer =newDefaultMQPushConsumer(CONSUMER_GROUP);// Specify name server addresses.
consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
consumer.setMessageModel(MessageModel.CLUSTERING);// default// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest","*");// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(), msgs);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});//Launch the consumer instance.
consumer.start();System.out.printf("Consumer Started.%n");}}
广播消费
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。
packagecom.morris.rocketmq.simple;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.common.protocol.heartbeat.MessageModel;importstaticcom.morris.rocketmq.util.Contant.CONSUMER_GROUP;importstaticcom.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;/**
* 广播消费消息(默认)
*/publicclassBroadcastingConsumer{publicstaticvoidmain(String[] args)throwsInterruptedException,MQClientException{// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer =newDefaultMQPushConsumer(CONSUMER_GROUP);// Specify name server addresses.
consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
consumer.setMessageModel(MessageModel.BROADCASTING);// default// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest","*");// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(), msgs);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});//Launch the consumer instance.
consumer.start();System.out.printf("Consumer Started.%n");}}
这种模式下消费者只能收到启动后发送MQ中的消息。
消息消费时的权衡
集群模式:
- 消费端集群化部署,每条消息只需要被处理一次。
- 由于消费进度在服务端维护,可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
广播模式:
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
目前仅Java客户端支持广播模式。广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
版权归原作者 morris131 所有, 如有侵权,请联系我们删除。