本文共 8593 字,大约阅读时间需要 28 分钟。
基于自定义协议的 RabbitMQ 模拟实现
前言
本文是笔者之前撰写的一个系列文章——“基于源码,模拟实现 RabbitMQ” 中的一部分。该系列文章旨在通过实际操作和实践,帮助开发者理解 RabbitMQ 的工作原理及其相关协议实现。如需了解更多内容,可参考笔者的完整系列文章。
一、自定义应用层协议
1.1 消息格式概述
自定义应用层协议主要定义了客户端与服务器之间消息的传输格式。该协议分为四个部分:type、length 和 payload。
- type:使用整数类型,表示消息的类型,用于指示消息的具体用途,例如调用服务器端的哪个服务。
- length:同样为整数类型,表示
payload的长度。由于 TCP 是面向字节流的协议,数据的分割和拼接可能导致粘包问题,因此协议中加入了length字段,用于明确数据的边界。 - payload:作为消息的数据载荷,承载调用VirtualHost中的具体服务所需的参数。由于 TCP 的特性,
payload采用二进制数据序列化的方式进行传输。
1.2 数据序列化方案
由于 TCP 是面向字节流的传输协议,传统的文本格式(如 JSON)在传输过程中可能导致效率低下且可读性差。因此,本文采用二进制数据序列化的方式作为 payload 的编码方案。具体实现如下:
import java.io.Serializable;class Request implements Serializable { val type: Int val length: Int val payload: ByteArray} class Response implements Serializable { val type: Int val length: Int val payload: ByteArray} 1.3 请求与响应参数封装
为了实现客户端与服务器之间的通信,本文对请求和响应参数进行了封装。以下是基础请求和响应参数的定义:
open class ReqBaseArguments { open val rid: String = "" open val channelId: String = ""}open class RespBaseArguments { open val rid: String open val channelId: String open val ok: Boolean = false} 二、BrokerServer 实现
2.1 BrokerServer 的功能概述
BrokerServer 是一个中间服务,其主要功能是接收客户端的 TCP 连接请求,并代理转发到 VirtualHost 中的具体服务。其启动时会通过 accept() 方法阻塞等待客户端连接,成功建立连接后,为每个客户端分配一个线程处理请求。
2.2 BrokerServer 的工作流程
socket.accept() 等待客户端连接。type、length 和 payload,提取出 VirtualHost 中的具体调用参数。2.3 BrokerServer 实现代码
class BrokerServer(private val port: Int) { private val socket = ServerSocket(port) private val channelSession = ConcurrentHashMap () fun start() { println("[BrokerServer] 启动!") while (true) { val client = socket.accept() clientPool.submit { clientProcess(client) } } } private fun clientProcess(client: Socket) { try { val inputStream = client.getInputStream() val outputStream = client.getOutputStream() DataInputStream(inputStream).use { DataOutputStream(outputStream).use { while (true) { val request = readRequest(it) val response = process(request, client) writeResponse(response, it) } } } } catch (e: EOFException) { println("[BrokerServer] 客户端正常下线!") } catch (e: Exception) { println("[BrokerServer] 客户端连接异常!") } finally { client.close() removeChannelSession(client) } } private fun process(request: Request, client: Socket): Response { val req = BinaryTool.bytesToAny(request.payload) val reqBase = req as ReqBaseArguments val ok = when (request.type) { 1 -> { channelSession[reqBase.channelId] = client println("[BrokerServer] channel 创建成功!") true } 2 -> { channelSession.remove(reqBase.channelId) println("[BrokerServer] channel 销毁成功!") true } 3 -> virtualHost.exchangeDeclare(req as ExchangeDeclareReq) 4 -> virtualHost.exchangeDelete(req as ExchangeDeleteReq) 5 -> virtualHost.queueDeclare(req as QueueDeclareReq) else -> throw RuntimeException("[BrokerServer] 客户端请求 type 非法!") } val respBase = RespBaseArguments(reqBase.rid, reqBase.channelId, ok) val payload = BinaryTool.anyToBytes(respBase) Response(request.type, payload.size, payload) } private fun removeChannelSession(client: Socket) { val channelIdList = mutableListOf () for (entry in channelSession) { if (entry.value == client) { channelIdList.add(entry.key) } } for (channelId in channelIdList) { channelSession.remove(channelId) } }} 三、Connection 和 Channel 实现
3.1 Connection 的功能概述
Connection 是一个 TCP 连接的抽象表示,其主要功能是为多个 Channel 提供通讯服务。为了提高效率,Connection 实现了连接的复用机制,每个 Connection 可以维护多个 Channel。
3.2 Channel 的功能概述
Channel 是 Connection 中的一个逻辑连接,用于实现客户端与 VirtualHost 之间的通信。Channel 提供了一系列方法(如 createChannel()、exchangeDeclare() 等),用于调用 VirtualHost 的服务。
3.3 Connection 的工作流程
newConnection() 方法创建一个新的 Connection 实例。createChannel() 方法,生成一个新的 Channel 实例,并通过 TCP 通道发送创建请求到 VirtualHost。waitResp() 方法提取响应数据,并将其交给客户端处理。putRespToChannel() 方法将响应数据传递给对应的 Channel,完成消息的处理。3.4 Channel 的实现代码
class Channel(private val channelId: String, private val connection: Connection) { private val ridRespMap = ConcurrentHashMap () private val locker = Object() fun generateRid(): String { return "R-${UUID.randomUUID()}" } fun waitResp(rid: String): RespBaseArguments { while (ridRespMap[rid] == null) { synchronized(locker) { locker.wait() } } return ridRespMap[rid]!! } fun notifyResp(respBase: RespBaseArguments) { ridRespMap[respBase.rid] = respBase synchronized(locker) { locker.notifyAll() } } fun createChannel(): Boolean { val reqBase = ReqBaseArguments(generateRid(), channelId) val payload = BinaryTool.anyToBytes(reqBase) val req = Request(1, payload.size, payload) connection.writeReq(req) val respBase = waitResp(reqBase.rid) return respBase.ok } fun removeChannel(): Boolean { val reqBase = ReqBaseArguments(generateRid(), channelId) val payload = BinaryTool.anyToBytes(reqBase) val req = Request(2, payload.size, payload) connection.writeReq(req) val respBase = waitResp(reqBase.rid) return respBase.ok } fun exchangeDeclare( name: String, type: ExchangeType, durable: Boolean, autoDelete: Boolean, arguments: MutableMap ): Boolean { val exchangeDeclareReq = ExchangeDeclareReq( name = name, type = type, durable = durable, autoDelete = autoDelete, arguments = arguments, rid = generateRid(), channelId = channelId ) val payload = BinaryTool.anyToBytes(exchangeDeclareReq) val req = Request(3, payload.size, payload) connection.writeReq(req) val respBase = waitResp(exchangeDeclareReq.rid) return respBase.ok } fun exchangeDelete(name: String): Boolean { val exchangeDeleteReq = ExchangeDeleteReq( name = name, rid = generateRid(), channelId = channelId ) val payload = BinaryTool.anyToBytes(exchangeDeleteReq) val req = Request(4, payload.size, payload) connection.writeReq(req) val respBase = waitResp(exchangeDeleteReq.rid) return respBase.ok } fun queueDeclare( name: String, durable: Boolean, exclusive: Boolean, autoDelete: Boolean, arguments: MutableMap ): Boolean { val queueDeclareReq = QueueDeclareReq( name = name, durable = durable, exclusive = exclusive, autoDelete = autoDelete, arguments = arguments, rid = generateRid(), channelId = channelId ) val payload = BinaryTool.anyToBytes(queueDeclareReq) val req = Request(5, payload.size, payload) connection.writeReq(req) val resp = waitResp(queueDeclareReq.rid) return resp.ok }} 四、Demo 与 实现说明
4.1 启动 BrokerServer
fun main() { val server = BrokerServer(9000) server.start()} 4.2 客户端连接
class Test2 { fun main() { val factory = ConnectionFactory("127.0.0.1", 9000) val connection = factory.newConnection() val channel = connection.createChannel() val ok1 = channel.createChannel() val ok2 = channel.exchangeDeclare("e1", ExchangeType.DIRECT, false, false, mutableMapOf()) val ok3 = channel.removeChannel() println("ok1: $ok1, ok2: $ok2, ok3: $ok3") }} 结论
通过上述实现,可以看到自定义协议在模拟实现 RabbitMQ 的过程中发挥了重要作用。BrokerServer 作为中间服务,负责接收客户端请求并代理转发;Connection 和 Channel 则实现了 TCP 连接的复用和逻辑连接的管理。整个系统通过自定义协议实现了高效的消息传输和服务调用,具有一定的扩展性和实用性。
发表评论
最新留言
关于作者