0


云原生中间件RocketMQ-生产者核心解析、主从同步机制解析,生产者同步异步消息发送

RocketMQ-生产者核心解析、主从同步机制解析,生产者同步异步消息发送

生产者核心参数

在这里插入图片描述
producerGroup: 组名
createTopicKey:创建topic,实际生产实践不允许生产者创建top。
**defaultTopicQueueNums(默认为4)**:默认的topic关联的队列数量
**sendMsgTimeout(单位:ms)**:发送消息连接broker超时时间。
**compressMsgBodyOverHowmuch(默认压缩字节4096)**:消息体达到多少压缩。
**retryTimesWhenSendFailed (可配置)**:发送失败重试次数
**retryAnotherBrokerWhenNotStoreOK(默认false)**:发送broker存储失败换个broker发送。
**maxMessageSize(默认128K)**:消息最大可以设置多大。
heartbeatBrokerInterval:与broker的心跳间隔(以微秒为单位,默认为30毫秒)

Master - Slave主从同步机制解析

同步的信息主要是数据内容和元数据信息

数据内容同步

实时进行同步,同步的是

commitlog中的数据

,对实时性要求高,并且丢失数据就无法恢复。使用

原生socket

进行同步。
源码分析
数据内容同步主要是在rocketmq-store中。主要涉及

HAConnection

HAService

WaitNotifyObject

三个类。并没有使用netty而是原生nio,是为了更加高效。
在这里插入图片描述

  • 对于Master节点在这里插入图片描述 HAService AcceptSocketService(内部类):接受slave节点连接。在这里插入图片描述 HAConnection ReadSocketService(内部类):读来自Slave节点的数据。 WriteSocketService(内部类):写往到Slave节点的数据。
  • 对于Slave节点

HAService
HAClient(内部类):对Master节点连接、读写数据。

元数据信息同步

broker判断如果是slave节点,那么会启动定时任务不断同步,如果丢失也可以从其他地方重试获取。包含topic信息和offset等。使用netty同步。
源码分析
元数据同步主要发生在broker,所以这部分代码主要在rocketmq-broker的模块中。是在

handleSlaveSynchronize

方法中通过定义了一个固定时间的定时任务,时间是10秒钟执行一次,当然前提条件是broker节点的角色是slave,而broker节点是master时,如果有定时任务会取消,因为master是不用同步元数据信息。
在这里插入图片描述
这个方法会有三个地方调用:

  • broker刚刚启动
  • master切换成slave
  • slave切换成master。在这里插入图片描述

定时任务的逻辑是写在

syncAll

方法中。主要是需要同步4部分内容:

  • 同步topic配置信息
  • 同步消费者偏移量
  • 同步延时偏移量
  • 同步订阅组配置信息。

在这里插入图片描述
4个方法的内容实际上就是封装了netty做rpc的调用,对不同的操作都会对应到一个code。
image.png
image.png
在这里插入图片描述

通信协议

Slave => Master:上报CommitLog已经同步到的物理位置。使用

maxPhyOffset

字段,代表CommitLog接受到的最大物理位置。
Master => Slave:传输新的CommitLog数据。使用

fromPhyOffset

字段,代表CommitLog开始传输的物理位置。

生产者消息发送

生产者同步消息发送

消息的同步发送:producer.send(msg)
同步发送消息核心实现:DefaultMQProducerlmpl
同步发送消息可以直接获取返回值:

// 同步发送消息,直接获取发送结果SendResult sr = producer.send(message,newMessageQueueSelector(){@OverridepublicMessageQueueselect(List<MessageQueue> mqs,Message msg,Object arg){Integer queueNumber =(Integer)arg;return mqs.get(queueNumber);}},2);System.err.println(sr);

生产者异步消息发送

producer.send(Message msg, SendCallback sendCallback)
异步发送消息核心实现:DefaultMQProducerlmpl
异步发送消息需要通过回调函数获取返回值:

// 异步发送消息,回调函数获取结果
producer.send(message,newSendCallback(){// 可靠性消息投递@OverridepublicvoidonSuccess(SendResult sendResult){System.err.println("msgId: "+ sendResult.getMsgId()+", status: "+ sendResult.getSendStatus());}@OverridepublicvoidonException(Throwable e){// 发送失败做异常处理
        e.printStackTrace();System.err.println("------发送失败");}});

本文内容到此结束了,
如有收获欢迎点赞👍收藏💖关注✔️,您的鼓励是我最大的动力。
如有错误❌疑问💬欢迎各位大佬指出。
主页:共饮一杯无的博客汇总👨‍💻

保持热爱,奔赴下一场山海。🏃🏃🏃

在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_35427589/article/details/126085594
版权归原作者 共饮一杯无 所有, 如有侵权,请联系我们删除。

“云原生中间件RocketMQ-生产者核心解析、主从同步机制解析,生产者同步异步消息发送”的评论:

还没有评论