0


RabbitMQ - 以 MQ 为例,手写一个 RPC 框架 demo

前言


本篇文章来自于笔者之前写过的一个系列 —— “根据源码,模拟实现 RabbitMQ” 系列,不妨可以去看看~

一、再谈自定义应用层协议

a)这个自定义应用层协议实际上就是在描述将来 客户端 和 服务器 之间通讯的消息格式长啥样

b)首先是一个 Int 类型的 type,描述了这个消息到底是用来干什么的(要调用服务器这边的哪一个服务).

c)然后就是 payload 的数据载荷,承载着将来调用 VirtualHost 中的具体的服务所需要的参数(例如创建交换机所需要的参数就有:交换机名字、交换机类型、是否自动删除、是否持久化、扩展参数).

因为 TCP 是面向字节流的(IO 流中主要提供的就是二进制数据的读写),因此这里不太适合使用 JSON 格式数据进行网络传输(可读性不好,效率不高),因此这里 payload 是一个 字节数组,将具体的数据序列化成 byte 数组放进来.

d)这里要注意的一点是,TCP 是面向字节流的,因此会出现粘包问题,那么为了解决这个问题,由两种办法,第一种就是约定分割符(读到指定分隔符就截止),第二种就是描述好 payload 的长度.

这里我采用的就是第二种办法,只需要在协议里面在添加一个 length 字段,用来描述 payload 的长度.

  1. import java.io.Serializable
  2. //Socket 自定义应用层协议(请求)
  3. data class Request(
  4. val type: Int,
  5. val length: Int,
  6. val payload: ByteArray,
  7. ): Serializable
  8. //Socket 自定义应用层协议(响应)
  9. data class Response(
  10. val type: Int,
  11. val length: Int,
  12. val payload: ByteArray,
  13. ): Serializable
  14. //基本参数(每个请求都会携带的参数,这里进行了一个封住)
  15. open class ReqBaseArguments(
  16. open val rid: String = "",
  17. open val channelId: String = "",
  18. ): Serializable
  19. //基本响应参数(每个响应都会携带的参数),主要是为了应对 mq 回调响应处理
  20. open class RespBaseArguments(
  21. open val rid: String,
  22. open val channelId: String,
  23. open val ok: Boolean,
  24. ): Serializable
  25. //主要的请求: 创建交换机、删除交换机、创建队列
  26. data class ExchangeDeclareReq(
  27. val name: String,
  28. val type: ExchangeType,
  29. val durable: Boolean,
  30. val autoDelete: Boolean,
  31. val arguments: MutableMap<String, Any>,
  32. override val rid: String,
  33. override val channelId: String,
  34. ): ReqBaseArguments(), Serializable
  35. data class ExchangeDeleteReq(
  36. val name: String,
  37. override val rid: String,
  38. override val channelId: String,
  39. ): ReqBaseArguments(), Serializable
  40. data class QueueDeclareReq(
  41. val name: String,
  42. val durable: Boolean,
  43. val exclusive: Boolean,
  44. val autoDelete: Boolean,
  45. val arguments: MutableMap<String, Any>,
  46. override val rid: String,
  47. override val channelId: String,
  48. ): ReqBaseArguments(), Serializable

二、再谈 BrokerServer

a)BrokerServer 就是一个中间服务,也可以简单理解为 VirtualHost 的代理(BrokerServer 接收客户端请求,调用 VirtualHost 中具体的服务).

b)BrokerServer 启动的时候,就会通过 accept 阻塞等待客户端这边的 TCP 连接,连接成功之后只需要为该客户端其分配一个线程,处理之后的任务.

c)此时这个线程就会处于一个死循环循环,通过 IO 流读取到 客户端 请求中的 type、length、payload ,并按照约定的格式进行解析 payload,得到具体数据(这里不仅包含了 VirtualHost 服务中所需要的具体的参数,还携带了 channelId 和 rid)

d)此时,只需要根据 IO 流中读取出的 type,调用对应 VirtualHost 中的服务即可.

e)最后再将 VirtualHost 处理后得到的响应封装成 我们约定的应用层协议格式,通过 IO 写入到流中,让客户端去读取.

  1. class BrokerServer(
  2. port: Int
  3. ) {
  4. private val socket = ServerSocket(port)
  5. private val clientPool = Executors.newFixedThreadPool(5)
  6. //key: channelId ,value: Socket
  7. //注意:这里的 Channel 只表示一个 "逻辑" 上的连接(创建,销毁 channel),这个 Map 是为了后台信息统计
  8. private val channelSession = ConcurrentHashMap<String, Socket>()
  9. private val virtualHost = VirtualHost()
  10. fun start() {
  11. println("[BrokerServer] 启动!")
  12. while (true) {
  13. val client = socket.accept()
  14. clientPool.submit {
  15. clientProcess(client)
  16. }
  17. }
  18. }
  19. private fun clientProcess(client: Socket) {
  20. println("[BrokerServer] 客户端上线!ip: ${client.inetAddress}, port: ${client.port}")
  21. try {
  22. client.getInputStream().use { inputStream ->
  23. client.getOutputStream().use { outputStream ->
  24. DataInputStream(inputStream).use { dataInputStream ->
  25. DataOutputStream(outputStream).use { dataOutputStream ->
  26. while (true) {
  27. val request = readRequest(dataInputStream)
  28. val response = process(request, client)
  29. writeResponse(response, dataOutputStream)
  30. }
  31. }
  32. }
  33. }
  34. }
  35. } catch (e: EOFException) {
  36. println("[BrokerServer] 客户端正常下线!ip: ${client.inetAddress}, port: ${client.port}")
  37. } catch (e: Exception) {
  38. println("[BrokerServer] 客户端连接异常!ip: ${client.inetAddress}, port: ${client.port}")
  39. } finally {
  40. client.close()
  41. removeChannelSession(client)
  42. }
  43. }
  44. private fun process(request: Request, client: Socket) = with(request) {
  45. //1.解析请求
  46. val req = BinaryTool.bytesToAny(payload)
  47. //2.获取请求中的 channelId,记录和 Socket 的关系(让每个 channel 都对应自己的 Socket,类似于 Session)
  48. val reqBase = req as ReqBaseArguments
  49. //3.根据 type 类型执行不同的服务(创建 Channel、销毁 Channel、创建交换机、删除交换机...)
  50. val ok = when(type) {
  51. 1 -> {
  52. channelSession[reqBase.channelId] = client
  53. println("[BrokerServer] channel 创建成功!channelId: ${reqBase.channelId}")
  54. true
  55. }
  56. 2 -> {
  57. channelSession.remove(reqBase.channelId)
  58. println("[BrokerServer] channel 销毁成功!channelId: ${reqBase.channelId}")
  59. true
  60. }
  61. 3 -> virtualHost.exchangeDeclare(req as ExchangeDeclareReq)
  62. 4 -> virtualHost.exchangeDelete(req as ExchangeDeleteReq)
  63. 5 -> virtualHost.queueDeclare(req as QueueDeclareReq)
  64. //...
  65. else -> throw RuntimeException("[BrokerServer] 客户端请求 type 非法!type: $type")
  66. }
  67. //4.返回响应
  68. val respBase = RespBaseArguments(reqBase.rid, reqBase.channelId, ok)
  69. val payload = BinaryTool.anyToBytes(respBase)
  70. Response(type, payload.size, payload)
  71. }
  72. /**
  73. * 读取客户端请求
  74. * 使用 DataInputStream 的主要原因就是有多种读取方式,例如 readInt()、readLong(),这些都是原生 InputStream 没有的
  75. */
  76. private fun readRequest(dataInputStream: DataInputStream) = with(dataInputStream) {
  77. val type = readInt()
  78. val length = readInt()
  79. val payload = ByteArray(length)
  80. val n = read(payload)
  81. if (n != length) throw RuntimeException("[BrokerServer] 读取客户端请求异常!")
  82. Request(type, length, payload)
  83. }
  84. /**
  85. * 将响应写回给客户端
  86. */
  87. private fun writeResponse(response: Response, outputStream: DataOutputStream) = with(outputStream) {
  88. writeInt(response.type)
  89. writeInt(response.length)
  90. write(response.payload)
  91. flush()
  92. }
  93. //删除所有和这个 clientSocket 有关的 Channel
  94. private fun removeChannelSession(client: Socket) {
  95. val channelIdList = mutableListOf<String>()
  96. //这里不能直接删除,会破坏迭代器结构
  97. for (entry in channelSession) {
  98. if (entry.value == client) channelIdList.add(entry.key)
  99. }
  100. for (channelId in channelIdList) {
  101. channelSession.remove(channelId)
  102. }
  103. }
  104. }
  1. class VirtualHost {
  2. fun exchangeDeclare(req: ExchangeDeclareReq): Boolean {
  3. //执行业务逻辑
  4. //...
  5. println("[VirtualHost] 创建交换机成功!")
  6. return true
  7. }
  8. fun exchangeDelete(req: ExchangeDeleteReq): Boolean {
  9. //执行业务逻辑
  10. //...
  11. println("[VirtualHost] 删除交换机成功!")
  12. return true
  13. }
  14. fun queueDeclare(req: QueueDeclareReq): Boolean {
  15. //执行业务逻辑
  16. //...
  17. println("[VirtualHost] 创建队列成功!")
  18. return true
  19. }
  20. }

三、再谈 Connection、Channel

a)一个 Connection 就是一个 TCP 连接,因此频繁 建立/断开连接(三次握手、四次挥手...)的开销也是相当大的,因此就引入了 Channel.

b)一个 Connection 下可以有多个 Channel(此处使用 map 来维护). Channel 只是简单的表示一个逻辑上的连接,可以理解为一个大的项目下被拆分成的多个小的微服务. 实现了 TCP 连接的复用.

c)起初,我们需要先创建出 Connection 与服务端建立连接,初始化构造中只需要写一个死循环,不断的从服务端这边读取响应.

d)接着,通过 Connection 创建出 Channel 来完成具体的业务(Channel 中就提供了一系列方法,就像调用本地的方法一样,调用到远程服务器的接口).

e)例如 Channel 中提供的创建叫交换机方法(channel.exchangeDeclare(...)),这个方法中具体要做的就是将传入的参数,封装到一个对象中,序列化成 二进制 数据,这就是将来协议中要传输的 payload. 进一步的,协议 Request 就构造出来了,通过 IO 写到流中,供服务端读取.

d)为了能够让每次请求和响应都能对的上,Channel 这里我维护了一个 map(key 是 rid、value 是具体的响应),客户端和服务端之间的每个请求和响应都会携带上这个 rid 这个参数,这样将来 Connection 客户端接受到响应的时候,就可以直接把 响应中的 rid 提取出来,交给 Channel 的 map 中(响应来之前,Channel 一直阻塞等待,直到响应来了 -> 能通过 rid 从 map 中得到).

  1. class ConnectionFactory(
  2. private val host: String,
  3. private val port: Int,
  4. ) {
  5. fun newConnection() = Connection(host, port)
  6. }
  1. class Connection(
  2. ip: String,
  3. port: Int,
  4. ) {
  5. private val socket = Socket(ip, port)
  6. private val channelMap = ConcurrentHashMap<String, Channel>()
  7. //下述这样提前创建好,是为了将来 Channel 在读写请求的时候的方便(Channel 就不用获取输入输出流了)
  8. private val inputStream = socket.getInputStream()
  9. private val outputStream = socket.getOutputStream()
  10. private val dataInputStream = DataInputStream(inputStream)
  11. private val dataOutputStream = DataOutputStream(outputStream)
  12. init {
  13. //此线程负责不停的从服务器这边获取响应
  14. Thread {
  15. try {
  16. while (!socket.isClosed) {
  17. //读取服务器响应
  18. val resp = readResp()
  19. //将响应交给对应的 Channel
  20. putRespToChannel(resp)
  21. }
  22. } catch (e: SocketException) {
  23. println("[Connection] 客户端正常断开连接")
  24. } catch (e: Exception) {
  25. println("[Connection] 客户端异常断开连接")
  26. e.printStackTrace()
  27. }
  28. }.start()
  29. }
  30. /**
  31. * 将客户端 Connection 接收到的请求,交给对应的 Channel 处理(此时 Channel 还在阻塞等待服务端响应)
  32. */
  33. private fun putRespToChannel(resp: Response) {
  34. //这里由于不涉及回调,所以每个 type 类型的响应都长一样,就按照一样的方式解析了
  35. val baseResp = BinaryTool.bytesToAny(resp.payload) as RespBaseArguments
  36. val channel = channelMap[baseResp.channelId]
  37. ?: throw RuntimeException("[Connection] 该响应对应的 Channel 不存在!channelId: ${baseResp.channelId}")
  38. //将响应交给 Channel
  39. channel.notifyResp(baseResp)
  40. }
  41. /**
  42. * 创建 Channel
  43. */
  44. fun createChannel(): Channel { //1.创建 Channel,保存到 map 种
  45. val channelId = "C-${UUID.randomUUID()}"
  46. val channel = Channel(channelId, this)
  47. channelMap[channelId] = channel
  48. //2.告知服务端 Channel 创建
  49. val ok = channel.createChannel()
  50. //3.如果 Channel 创建不成功,客户端这边也应该要删除对应的 Channel 信息
  51. if (!ok) channelMap.remove(channelId)
  52. return channel
  53. }
  54. private fun readResp() = with(dataInputStream) {
  55. val type = readInt()
  56. val length = readInt()
  57. val payload = ByteArray(length)
  58. val n = read(payload)
  59. if (n != length) throw RuntimeException("[Connection] 客户端读取响应异常!")
  60. Response(type, length, payload)
  61. }
  62. fun writeReq(request: Request) = with(dataOutputStream) {
  63. writeInt(request.type)
  64. writeInt(request.length)
  65. write(request.payload)
  66. flush()
  67. }
  68. }
  1. class Channel(
  2. private val channelId: String,
  3. private val connection: Connection, //自己当前属于哪个 Channel
  4. ) {
  5. //key: rid(为了能让每个 Channel 对应上自己的响应)
  6. //value: RespBaseArguments(具体的响应)
  7. //当 Connection 的扫描线程接收到响应之后,就会将响应传给这个 map
  8. private val ridRespMap = ConcurrentHashMap<String, RespBaseArguments>()
  9. //这个锁是用来阻塞等待服务端响应的(避免轮询),当服务端传来响应时,Connection 就会唤醒锁
  10. private val locker = Object()
  11. private fun generateRid() = "R-${UUID.randomUUID()}"
  12. private fun waitResp(rid: String): RespBaseArguments {
  13. val respBase: RespBaseArguments
  14. while (ridRespMap[rid] == null) { // 如果为空,说明此时服务端还没有传来响应
  15. synchronized(locker) { //为了避免轮询,就让其阻塞等待
  16. locker.wait()
  17. }
  18. }
  19. //出了这个循环,那么 ridRespMap[rid] 一定不为空
  20. return ridRespMap[rid]!!
  21. }
  22. fun notifyResp(respBase: RespBaseArguments) {
  23. ridRespMap[respBase.rid] = respBase
  24. synchronized(locker) {
  25. //当前也不直到有多少线程在等待响应,就全部唤醒
  26. locker.notifyAll()
  27. }
  28. }
  29. /**
  30. * 创建 Channel
  31. */
  32. fun createChannel(): Boolean {
  33. //1.创建基本请求
  34. val reqBase = ReqBaseArguments(
  35. rid = generateRid(),
  36. channelId = channelId
  37. )
  38. //2.构造 TCP 通信请求
  39. val payload = BinaryTool.anyToBytes(reqBase)
  40. val req = Request(
  41. type = 1,
  42. length = payload.size,
  43. payload = payload
  44. )
  45. //3.发送请求
  46. connection.writeReq(req)
  47. //4.等待客户端响应
  48. val respBase = waitResp(reqBase.rid)
  49. return respBase.ok
  50. }
  51. fun removeChannel(): Boolean {
  52. //1.创建基本请求
  53. val reqBase = ReqBaseArguments(
  54. rid = generateRid(),
  55. channelId = channelId
  56. )
  57. //2.构造 TCP 通信请求
  58. val payload = BinaryTool.anyToBytes(reqBase)
  59. val req = Request(
  60. type = 2,
  61. length = payload.size,
  62. payload = payload
  63. )
  64. //3.发送请求
  65. connection.writeReq(req)
  66. //4.等待客户端响应
  67. val respBase = waitResp(reqBase.rid)
  68. return respBase.ok
  69. }
  70. fun exchangeDeclare(
  71. name: String,
  72. type: ExchangeType,
  73. durable: Boolean,
  74. autoDelete: Boolean,
  75. arguments: MutableMap<String, Any>,
  76. ): Boolean {
  77. val exchangeDeclareReq = ExchangeDeclareReq(
  78. name = name,
  79. type = type,
  80. durable = durable,
  81. autoDelete = autoDelete,
  82. arguments = arguments,
  83. rid = generateRid(),
  84. channelId = channelId,
  85. )
  86. val payload = BinaryTool.anyToBytes(exchangeDeclareReq)
  87. val req = Request(
  88. type = 3,
  89. length = payload.size,
  90. payload = payload,
  91. )
  92. connection.writeReq(req)
  93. val respBase = waitResp(exchangeDeclareReq.rid)
  94. return respBase.ok
  95. }
  96. fun exchangeDelete(name: String): Boolean {
  97. val exchangeDeleteReq = ExchangeDeleteReq(
  98. name = name,
  99. rid = generateRid(),
  100. channelId = channelId,
  101. )
  102. val payload = BinaryTool.anyToBytes(exchangeDeleteReq)
  103. val req = Request(
  104. type = 4,
  105. length = payload.size,
  106. payload = payload,
  107. )
  108. connection.writeReq(req)
  109. val respBase = waitResp(exchangeDeleteReq.rid)
  110. return respBase.ok
  111. }
  112. fun queueDeclare(
  113. name: String,
  114. durable: Boolean,
  115. exclusive: Boolean,
  116. autoDelete: Boolean,
  117. arguments: MutableMap<String, Any>,
  118. ): Boolean {
  119. val queueDeclareReq = QueueDeclareReq(
  120. name = name,
  121. durable = durable,
  122. exclusive = exclusive,
  123. autoDelete = autoDelete,
  124. arguments = arguments,
  125. rid = generateRid(),
  126. channelId = channelId,
  127. )
  128. val payload = BinaryTool.anyToBytes(queueDeclareReq)
  129. val req = Request(
  130. type = 5,
  131. length = payload.size,
  132. payload = payload,
  133. )
  134. connection.writeReq(req)
  135. val resp = waitResp(queueDeclareReq.rid)
  136. return resp.ok
  137. }
  138. }

四、Demo

a)启动服务器

  1. fun main() {
  2. val server = BrokerServer(9000)
  3. server.start()
  4. }

b)客户端连接

  1. class Test2 {
  2. }
  3. fun main() {
  4. val factory = ConnectionFactory("127.0.0.1", 9000)
  5. val connection = factory.newConnection()
  6. val channel = connection.createChannel()
  7. val ok1 = channel.createChannel()
  8. val ok2 = channel.exchangeDeclare("e1", ExchangeType.DIRECT, false, false, mutableMapOf())
  9. val ok3 = channel.removeChannel()
  10. println("ok1: $ok1, ok2: $ok2, ok3: $ok3")
  11. }

标签: rabbitmq rpc 网络

本文转载自: https://blog.csdn.net/CYK_byte/article/details/138559976
版权归原作者 陈亦康 所有, 如有侵权,请联系我们删除。

“RabbitMQ - 以 MQ 为例,手写一个 RPC 框架 demo”的评论:

还没有评论