随着响应式(Reactor)编程日益火热,响应式配套组件也在渐渐的完善。将响应式引入到了数据库,其响应式的背压机制与资源调度等特性可以带来巨大的性能。相比传统编程而言,当 Web 服务中入站流量大起来,如果都是一些数据库的操作的话,此时流量会一股脑的冲向数据库,就很容易将数据库压垮(数据库流量一大,可能会导致数据库宕机)。面对这种情况可以使用消息队列来进行削峰填谷,但是维护消息队列也是需要成本的,倘若项目要求的并发不高,可以使用响应式的背压机制,在客户端与服务端之间建立一层缓冲,之后对数据库的操作都会先经过背压这层缓冲,然后发送给数据库。 响应式对数据库操作是完全非阻塞的,Reactor 提供 Mono 和 Flux 两个核心 API,它们的 subscribeOn 与 publishOn 方法可以对资源进行细粒度的调度,前者用于改变订阅与下发元素处理线程,而后者用于改变下发元素处理线程,在进行响应式开发中,我们只需根据任务类型(计算型任务或IO型任务)来选择合适调度器即可。
背压并不是银弹,当流量大起来还是需要使用消息队列和Redis的,背压只是在一定程度上减轻数据库的压力。
什么是 R2DBC-SPI
r2dbc-spi github:https://github.com/r2dbc/r2dbc-spi
R2DBC 全称是响应式关系型数据库连接(Reactive Relational Database Connectivity),R2DBC 作为响应式基础组件,此时就需要一套规范用于在响应式中规范数据库操作,随之 R2DBC-SPI 就应景而生了。R2DBC-SPI 与 JDBC 本质上相同,都是为了给数据库操作提供一层通用抽象,只是两者的侧重点不同,前者是完全非阻塞的,而后者则是阻塞的,因为 JDBC 在底层 API 设计上就不支持非阻塞。
mariadb-connector-r2dbc 简介
mariadb-connector-r2dbc github:https://github.com/mariadb-corporation/mariadb-connector-r2dbc
r2dbc-mysql github:https://github.com/mirromutth/r2dbc-mysql
目前市面上有 mariadb-connector-r2dbc 与 r2dbc-mysql 两款 MySQL R2DBC 驱动,在进行项目开发时比较推荐使用 mariadb-connector-r2dbc 作为 MySQL R2DBC 驱动。前者的作者拥有二十余年的 Java 开发经验,同时我也看过这两者的源码,在设计上 mariadb-connector-r2dbc 要比 r2dbc-mysql 更优秀,并且 mariadb-connector-r2dbc 支持 MySQL 和 MariaDB 数据库, 个人觉得前者的潜力是要优于后者。
客户端与服务端消息协议 客户端向服务端发送数据称为请求(request),而服务端返回给客户端的数据称为响应(response),在 mariadb-connector-r2dbc 中通过ClientMessage和 ServerMessage 接口来区分客户端与服务端消息。
ClientMessage如果客户端需要向数据库发送一条查询 SQL 语句,就需要将 SQL 编码成指定的二进制数据,然后发送给数据库处理。对应在 ClientMessage 接口设计上,该接口中提供了一个 encode 方法,看到方法的签名,该方法接收Context 与 ByteBufAllocator 对象作为方法参数,方法返回类型是一个 ByteBuf 对象,因为 mariadb-connector-r2dbc 使用 reactor-netty 作为数据的传输层,而 reactor-netty 底层又是基于 netty,所以采用 ByteBuf 作为返回类型来对接 netty。
ClientMessage 实现不同对消息编码操作也是不同的,通过 encode 方法可以将具体的编码操作交于对应的实现类,这样起到了一个解耦的作用。我们还可以根据运行时上下文数据 Context 来对消息进行编码操作。Context 在 Reactor 用于持久化一些运行时数据,而在传统编程中使用 ThreadLocal 来进行存储,其实 ThreadLocal 并不合适在 Reactor 场景下使用,因为 Reactor 代码执行时会不断地切换线程,倘若使用了 ThreadLocal 来持久化运行时数据,不但增加了实现复杂度,还可能导致内存泄露。
1 2 3 4 5 6 7 8 9 public interface ClientMessage { default Sequencer getSequencer () { return new Sequencer((byte ) 0xff ); } ByteBuf encode (Context context, ByteBufAllocator byteBufAllocator) ; }
ServerMessageServerMessage 接口并未有 decode 操作,对于服务端消息来说,解码操作不应该放到该接口里面。只需根据消息的类型来选择对应的解码操作即可,那么可以将这些操作封装在一些特定的解码器中(参考 org.mariadb.r2dbc.client.DecoderState),解析之后将消息下发给对应的 Flux 处理即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public interface ServerMessage { default Sequencer getSequencer () { return null ; } default boolean ending () { return false ; } default boolean resultSetEnd () { return false ; } }
MariadbConnectionFactory 源码解读 MariadbConnectionFactory 实现了 r2dbc-spi 中的 ConnectionFactory 接口,该接口设计的也很简单,可以通过 create() 方法来获取数据库链接,另外还提供一个 getMetadata() 方法用于获取工厂的元信息。
1 2 3 4 5 6 7 public interface ConnectionFactory { Publisher<? extends Connection> create(); ConnectionFactoryMetadata getMetadata () ; }
之后看到 MariadbConnectionFactory 构造器,接收一个 MariadbConnectionConfiguration 对象来作为构造器参数,通过该对象来实例化 MariadbConnectionFactory ,那么这个参数必然有数据库配置的信息,因为 ConnectionFactory 接口定义了一个无参的 create() 方法来获取链接,那么数据库配置信息来源就能排除从 create() 方法传入,此时只能通过构造器参数或者通过特定的方法来设置。
1 2 3 4 5 public MariadbConnectionFactory (MariadbConnectionConfiguration configuration) { this .configuration = Assert.requireNonNull(configuration, "configuration must not be null" ); this .endpoint = createSocketAddress(configuration); }
有了工厂实例后,就可以调用该工厂的 create() 方法来创建数据库链接。进入 create() 方法,在该方法中调用了类中的 doCreateConnection 方法,之后得到一个 Mono<MariadbConnection> 再将结果强转成 MariadbConnection 对象。不妨有读者会产生疑问,既然都是同一个对象为什么还要进行转换呢?因为这两个 MariadbConnection 是在不同的包下,转换之后的org.mariadb.r2dbc.api.MariadbConnection 是一个接口,该接口继承了 r2dbc-spi 中的 Connection 接口。而Mono<MariadbConnection > 中的 MariadbConnection 是 org.mariadb.r2dbc.api.MariadbConnection 实现类,倘若不进行转换那么就会出现返回值与返回类型不兼容问题。它们两者的关系有点复杂,对于具体的细节,还请读者自行打开源码进行阅读。
1 2 3 4 public Mono<org.mariadb.r2dbc.api.MariadbConnection> create() { return doCreateConnection().cast(org.mariadb.r2dbc.api.MariadbConnection.class); }
进入 doCreateConnection() 方法,会根据 configuration.allowPipelining() 来判断是否采用异步的 Client,默认情况下会使用异步的 Client,对于 ClientPipelineImpl和ClientImpl 两种客户端实现的不同,会在下文进行讲解,此时我们要知道这里客户端采用 ClientPipelineImpl 就好了。看到 14 行,此时并未与服务端建立起 Socket 链接,Reactor 不同于传统编程,只有当下游产生订阅时数据库链接操作才会开始,这也是懒加载的实现。在 15 行,Socket 链接建立之后我们会通过 AuthenticationFlow#exchange 来对客户端身份进行校验,也就是从 configuration 中拿到用户名和密码封装成一个 packet 之后发送给服务端,服务端校验成功之后,此时我们的链接才算真正建立了。
客户端身份校验成功之后,会通过 getIsolationLevel(Client) 来获取数据库隔离等级。在 getIsolationLevel(Client) 方法中,来选择对应的数据库版本查询隔离等级的语句。之后将查询语句发送的服务端,得到服务端消息解析之后返回数据库的隔离等级。
最后通过一个 map 操作将数据的隔离等级作为参数封装成 MariadbConnection 对象返回,获取数据库链接的操作已经分析完了,此时并没有与数据建立起链接,我们只是定义了链接的行为而已,当调用 MariadbConnectionFactory#create 得到一个 Mono<Connection> 对象,只有调用该 Mono 的 subscribe 方法之后才会进行数据库链接操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 private Mono<MariadbConnection> doCreateConnection () { Mono<Client> clientMono; if (configuration.allowPipelining()) { clientMono = ClientPipelineImpl.connect( ConnectionProvider.newConnection(), this .endpoint, configuration); } else { clientMono = ClientImpl.connect(ConnectionProvider.newConnection(), this .endpoint, configuration); } return clientMono .delayUntil(client -> AuthenticationFlow.exchange(client, this .configuration)) .cast(Client.class) .flatMap( client -> { Mono<Void> waiting = Mono.empty(); if (configuration.getSessionVariables() != null && configuration.getSessionVariables().size() > 0 ) { waiting = setSessionVariables(client); } if (configuration.getIsolationLevel() == null ) { Mono<IsolationLevel> isolationLevelMono = waiting.then(getIsolationLevel(client)); return isolationLevelMono .map(it -> new MariadbConnection(client, it, configuration)) .onErrorResume(throwable -> this .closeWithError(client, throwable)); } else { return waiting .then( Mono.just( new MariadbConnection( client, configuration.getIsolationLevel(), configuration))) .onErrorResume(throwable -> this .closeWithError(client, throwable)); } }) .onErrorMap(this ::cannotConnect); } private Mono<IsolationLevel> getIsolationLevel (Client client) { String sql = "SELECT @@tx_isolation" ; if (!client.getVersion().isMariaDBServer() && (client.getVersion().versionGreaterOrEqual(8 , 0 , 3 ) || (client.getVersion().getMajorVersion() < 8 && client.getVersion().versionGreaterOrEqual(5 , 7 , 20 )))) { sql = "SELECT @@transaction_isolation" ; } return new MariadbSimpleQueryStatement(client, sql) .execute() .flatMap( it -> it.map( (row, rowMetadata) -> { String level = row.get(0 , String.class); switch (level) { case "REPEATABLE-READ" : return IsolationLevel.REPEATABLE_READ; case "READ-UNCOMMITTED" : return IsolationLevel.READ_UNCOMMITTED; case "READ-COMMITTED" : return IsolationLevel.READ_COMMITTED; case "SERIALIZABLE" : return IsolationLevel.SERIALIZABLE; default : return IsolationLevel.READ_COMMITTED; } })) .defaultIfEmpty(IsolationLevel.READ_COMMITTED) .last(); }
两种 Client 实现 R2DBC-SPI 使用 io.r2dbc.spi.Connection 接口来抽象出数据库链接,而在 mariadb-connector-r2dbc 中通过 org.mariadb.r2dbc.client.Client 接口定义了客户端与数据库交互的行为。Client 作为 mariadb-connector-r2dbc 中的核心接口,屏蔽了与数据库交互的细节。该项目存在有两种 Clinet 实现,一种是同步的 Client (ClientImpl),另外一种是异步的 Client(ClientPipelineImpl),对于这两种 Clinet 实现,它们一些通用方法可以抽到一个抽象类(ClientBase)中,但具体的链接数据库、发送命令以及其它细节需要在具体的 ClientBase 实现类中进行实现。
ClientBase 提供一个含参构造函数,该构造器接收一个 Connection 和 MariadbConnectionConfiguration 对象,这里的 Connection 并不是 R2DBC-SPI 中的 Connection,而是 reactor-netty 中的 Connection,可以理解为是一个 Socket 链接,只不过 reactor-netty 对该 Socket 进行增强了,之后与数据库进行交互也是通过该 Connection 来进行。在第 9 行创建了一个 netty 解码器,MariadbPacketDecoder 继承了 ByteToMessageDecoder,在服务端消息的入站时可以对消息进行解码操作。11与12行添加对应的编解码器到该 Connection 中,从而对客户端与服务端消息进行编解码操作。在 20 行,通过 connection.inbound().receive() 来接收服务端消息,之后并未对服务端消息进行任何处理,难道说并不需要对服务端消息进行处理吗?当然不是这样的,这也是 mariadb-connector-r2dbc 设计的精妙之处,它是通过 MariadbPacketDecoder 对服务端消息解码之后,在通过 FluxSink 来进行数据下发,具体的细节这里先不进行展开,此时只需了解 ClientBase 大致的功能即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected ClientBase (Connection connection, MariadbConnectionConfiguration configuration) { this .connection = connection; this .configuration = configuration; this .prepareCache = this .configuration.useServerPrepStmts() ? new PrepareCache(this .configuration.getPrepareCacheSize(), this ) : null ; this .mariadbPacketDecoder = new MariadbPacketDecoder(responseReceivers, this ); connection.addHandler(mariadbPacketDecoder); connection.addHandler(mariadbPacketEncoder); if (logger.isTraceEnabled()) { connection.addHandlerFirst( LoggingHandler.class.getSimpleName(), new LoggingHandler(ClientBase.class, LogLevel.TRACE)); } connection .inbound() .receive() .doOnError(this ::handleConnectionError) .doOnComplete(this ::closedServlet) .then() .subscribe(); }
Client 接口提供一个 sendCommand(ClientMessage) 用于发送消息给服务端,并且接收一个 Flux<ServerMessage>,既然有两种不同的 Client 实现,那么 ClientBase 就可以直接实现该方法,然后具体的发送细节交由对应的实现类进行实现。
1 2 Flux<ServerMessage> sendCommand (ClientMessage requests) ;
在 Client#sendCommand(ClientMessage) 方法中调用了 sendCommand 重载方法并传入一个解码状态(DecoderState),最后在调用了 sendCommand(ClientMessage message, DecoderState initialState, String sql) 抽象方法,该抽象方法由对应的继承类来进行实现。
1 2 3 4 5 6 7 8 9 10 11 public Flux<ServerMessage> sendCommand (ClientMessage message) { return sendCommand(message, DecoderState.QUERY_RESPONSE); } public Flux<ServerMessage> sendCommand (ClientMessage message, DecoderState initialState) { return sendCommand(message, initialState, null ); } public abstract Flux<ServerMessage> sendCommand ( ClientMessage message, DecoderState initialState, String sql) ;
ClientImpl 既然 ClientImpl 是一个同步的 Client,那么我们思考一下“同步”的有什么特征,客户端给服务端消息时,服务端接收到消息需要执行对应的 SQL 语句,那么这个操作时同步进行的,也就是说那个线程处理该链接,语句的执行也就由该线程进行的。针对该特征,那么在 ClientImpl 设计方面可以这样做,当前的语句执行完之后,服务端返回了处理结果,下一条语句才会发送给服务端。此时肯定需要一个容器来存储 SQL 语句,果然与我们猜想一样,在 ClientImpl 中提供了一个 sendingQueue 无界队列用于存储发送的 ClientMessage。
之后我们深入到具体的发送细节,在 36 行实现了 ClientBase 中那个 sendCommand 抽象方法,41-46 对链接(Connection)状态进行校验,要是该链接关闭了后续的代码就没有必要继续执行了,直接向下游发送一个 R2dbcNonTransientResourceException 元素即可。47 行通过 atomicBoolean.compareAndSet(false, true) 来将 atomicBoolean 设置为 true,目的是为了防止该 Flux 被二次订阅,如果没有该保护措施,该 Flux 不小心被下游二次订阅了那么会重复的将该 SQL 语句发送给服务端执行,从而导致客户端与数据库资源的浪费。设置成功之后加上一把重入锁,用于保证操作的安全性。在 59 行会判断 responseReceivers 队列中是否有元素,如果满足,那么将 sink, initialState, sql 封装成一个 CmdElement 对象添加到 responseReceivers 队列中,之后将 ClientMessage 发送到 服务端(向 Connection 中写入 ClientMessage)。我们前面也说到 ClientImpl 是一个同步的操作,同一时间段只能处理一条语句,而 responseReceivers 队列是用于存储服务端消息处理操作的,如果 responseReceivers 中没有元素那么说明当前数据库链接并未处理 SQL 语句,此时客户端可以将 ClientMessage 发送给数据库执行。要是 responseReceivers 队列中没有元素,看到 53-56 行,同样先将 sink, initialState, sql 封装成一个 CmdElement 添加到 responseReceivers 中,接下来的操作与之前不同,而是将 ClientMessage 添加到发送队列中(sendingQueue)等待当前语句处理完成之后在发送给服务端。
那么存放在 sendingQueue 队列中 的 ClientMessage ,会在何时发送呢?现在来看到 ClientImpl#sendNext() 方法,在该方法中同样是先加上一把锁,然后从 sendingQueue 队列中取出一条 ClientMessage,要是不为 null,那么写入到 Connection (发送给服务端)中。我们先不要管 sendNext() 会在何时调用,现在只需要对 ClientImpl 特性有个大概的认识即可。
sendNext() 是 Client 接口定义的一个方法,具体的细节会在 MariadbPacketDecoder 设计解读中提到
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public final class ClientImpl extends ClientBase { public ClientImpl (Connection connection, MariadbConnectionConfiguration configuration) { super (connection, configuration); } protected final Queue<ClientMessage> sendingQueue = Queues.<ClientMessage>unbounded().get(); public static Mono<Client> connect ( ConnectionProvider connectionProvider, SocketAddress socketAddress, MariadbConnectionConfiguration configuration) { TcpClient tcpClient = TcpClient.create(connectionProvider).remoteAddress(() -> socketAddress); tcpClient = setSocketOption(configuration, tcpClient); return tcpClient.connect().flatMap(it -> Mono.just(new ClientImpl(it, configuration))); } public void sendCommandWithoutResult (ClientMessage message) { try { lock.lock(); if (this .responseReceivers.isEmpty()) { connection.channel().writeAndFlush(message); } else { sendingQueue.add(message); } } finally { lock.unlock(); } } public Flux<ServerMessage> sendCommand (PreparePacket preparePacket, ExecutePacket executePacket) { return Flux.error(new R2dbcNonTransientResourceException("Cannot pipeline" )); } public Flux<ServerMessage> sendCommand ( ClientMessage message, DecoderState initialState, String sql) { AtomicBoolean atomicBoolean = new AtomicBoolean(); return Flux.create( sink -> { if (!isConnected()) { sink.error( new R2dbcNonTransientResourceException( "Connection is close. Cannot send anything" )); return ; } if (atomicBoolean.compareAndSet(false , true )) { try { lock.lock(); if (this .responseReceivers.isEmpty()) { this .responseReceivers.add(new CmdElement(sink, initialState, sql)); connection.channel().writeAndFlush(message); } else { this .responseReceivers.add(new CmdElement(sink, initialState, sql)); sendingQueue.add(message); } } finally { lock.unlock(); } } }); } public void sendNext () { lock.lock(); try { ClientMessage next = sendingQueue.poll(); if (next != null ) connection.channel().writeAndFlush(next); } finally { lock.unlock(); } } }
ClientPipelineImpl ClientPipelineImpl 做为一个异步的 Client,此时我们在思考一下异步的特征。数据库接收到客户端消息时,会将该查询语句交由其它线程进行(线程池某个线程)处理,此时处理链接的线程就可以去接收该客户端发送的其它消息了,之后只需将处理结果按照客户端发送顺序返回给客户端就行了。
废话不多说,我们先来看到 ClientPipelineImpl#sendCommand(ClientMessage message, DecoderState initialState, String sql) 方法,该方法一些操作上与 ClientImpl#sendCommand 相同,在39-44行这里有点不同,此时先封装 CmdElement 添加到 responseReceivers 中,由于 Client 是一个异步处理的客户端,之后直接将 ClientMessage 发送给服务端。在 ClientPipelineImpl 并未向 ClientImpl 一样使用 sendingQueue 存储 ClientMessage(客户端消息),而且该 sendNext 方法也未有任何操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public final class ClientPipelineImpl extends ClientBase { public ClientPipelineImpl (Connection connection, MariadbConnectionConfiguration configuration) { super (connection, configuration); } public static Mono<Client> connect ( ConnectionProvider connectionProvider, SocketAddress socketAddress, MariadbConnectionConfiguration configuration) { TcpClient tcpClient = TcpClient.create(connectionProvider).remoteAddress(() -> socketAddress); tcpClient = setSocketOption(configuration, tcpClient); return tcpClient.connect().flatMap(it -> Mono.just(new ClientPipelineImpl(it, configuration))); } public void sendCommandWithoutResult (ClientMessage message) { try { lock.lock(); connection.channel().writeAndFlush(message); } finally { lock.unlock(); } } public Flux<ServerMessage> sendCommand (PreparePacket preparePacket, ExecutePacket executePacket) { AtomicBoolean atomicBoolean = new AtomicBoolean(); return Flux.create( sink -> { if (!isConnected()) { sink.error( new R2dbcNonTransientResourceException( "Connection is close. Cannot send anything" )); return ; } if (atomicBoolean.compareAndSet(false , true )) { try { lock.lock(); this .responseReceivers.add( new CmdElement( sink, DecoderState.PREPARE_AND_EXECUTE_RESPONSE, preparePacket.getSql())); connection.channel().writeAndFlush(preparePacket); connection.channel().writeAndFlush(executePacket); } finally { lock.unlock(); } } }); } public Flux<ServerMessage> sendCommand ( ClientMessage message, DecoderState initialState, String sql) { AtomicBoolean atomicBoolean = new AtomicBoolean(); return Flux.create( sink -> { if (!isConnected()) { sink.error( new R2dbcNonTransientResourceException( "Connection is close. Cannot send anything" )); return ; } if (atomicBoolean.compareAndSet(false , true )) { try { lock.lock(); this .responseReceivers.add(new CmdElement(sink, initialState, sql)); connection.channel().writeAndFlush(message); } finally { lock.unlock(); } } }); } public void sendNext () {} }
编解码器解读 在 netty 中,需要对消息进行一个编解码操作,对于入站消息会进行解码操作,而出站消息会进行编码操作。这些操作都要通过特定的编解码 API 来进行实现,我们在上层都是通过一个对象来充当一个特定的消息,比如说通过 connection.channel().writeAndFlush(ClientMessage) 方法来将 ClientMessage 发送给服务端,因为 Netty 底层都是通过二进制来进行传输的,所以需要一个编码器(MariadbPacketEncoder)来将 ClientMessage 编码成一个 ByteBuf,之后 netty 在转换成 Java NIO 中的 ByteBuffer 发送给服务端。对于入站消息也是一样的,前面我们也提到使用 ServerMessage 来作为服务端消息,那么从一个二进制数据(ByteBuf)转换成 ServerMessage 需要通过一个解码器(MariadbPacketDecoder )来进行实现。
MariadbPacketEncoder MariadbPacketEncoder 实现也很简单,我们来看到它的 encode 方法,该方法接收三个参数,分别是 ChannelHandlerContext、ClientMessage、ByteBuf,我们通过对 ClientMessage 进行编码操作之后将编码结果写入到 out (ByteBuf out)中。在 11 行中通过一个 ClientMessage#encode 方法先对消息进行一个编码操作,不同的 ClientMessage 它的编码操作是不同的,具体的细节需要看 ClientMessage#encode 方法实现。编码完成之后,得到一个 ByteBuf 对象,然后对单个与多个 mysql packet 进行处理,对于多个 mysql packet 会进行分割。
大概流程就是这样,最后还有一个小细节,在 40 行会对 buf 进行一个释放的操作,因为我们的 buf 已经写入到了 out 中,那么就不需要该 buf 了。不少人在编写代码时都会忘记释放不需要的 ByteBuf,这并不是一个好的习惯,及时释放不需要资源的可以提高系统资源的利用率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class MariadbPacketEncoder extends MessageToByteEncoder <ClientMessage > { private Context context = null ; @Override protected void encode (ChannelHandlerContext ctx, ClientMessage msg, ByteBuf out) throws Exception { ByteBuf buf = null ; try { buf = msg.encode(this .context, ctx.alloc()); if (buf.writerIndex() - buf.readerIndex() < 0xffffff ) { out.writeMediumLE(buf.writerIndex() - buf.readerIndex()); out.writeByte(msg.getSequencer().next()); out.writeBytes(buf); return ; } int readerIndex = buf.readerIndex(); int packetLength = -1 ; while (readerIndex < buf.writerIndex()) { packetLength = Math.min(0xffffff , buf.writerIndex() - readerIndex); out.writeMediumLE(packetLength); out.writeByte(msg.getSequencer().next()); out.writeBytes(buf.slice(readerIndex, packetLength)); readerIndex += packetLength; } if (packetLength == 0xffffff ) { out.writeMediumLE(packetLength); out.writeByte(msg.getSequencer().next()); } } finally { if (buf != null ) buf.release(); } } public void setContext (Context context) { this .context = context; } }
MariadbPacketDecoder MariadbPacketDecoder 作为 netty 入站消息解码器和 mariadb-connector-r2dbc 服务端消息的处理器,同时这也是该项目中最核心的组件之一。先来看到 MariadbPacketDecoder 构造器,接收一个 Queue<CmdElement> 和 Client 作为构造器参数,前者是用于处理服务端消息(通过 CmdElement 中的 Flinksink 下发消息到对应的 Flux 中),后者只是用来将 MariadbPacketDecoder 和Client 进行一个绑定。
既然该对象是 netty 入站消息解码器,那么我们重点关注它的 decode 方法,在该方法中首先对服务端的二进制数据进行解析,解析的过程我们不必过多的关注,我们重点放在 handleBuffer(ByteBuf packet, Sequencer sequencer) 方法中,方法的参数 packet 是解析好后的服务端消息包。既然是服务端消息,那么肯定需要根据它的消息类型来选择对应的解码器进行解码操作,在 69 行获取该服务端消息类型的解码器,之后调用它的解码器得到一个 ServerMessage 对象,最后通过 CmdElement 中的 FluxSink 来将 ServerMessage 进行下发。如果当前的 ServerMessage 是一个结束的标志(76行),那就认为我们当前的 SQL 语句处理完成了,之后就需要加载下一个 CmdElement 对象。如80行所示,会调用 loadNextResponse() 来从 responseReceivers 队列中加载下一个 CmdElement,之后再把原来的 CmdElement 中的 FluxSink 设置成下发元素完毕(FluxSink#complete()),使得下游停止接收元素(意味着这个操作已经完成了)。最后重点来了,我们前面也在 ClientImpl 与 ClientPipelineImpl 章节提到 sendNext() 方法,此时在 83 行会调用该 sendNext() 方法,该方法作为一个扩展点目的是为了兼容不同Client实现的特性 ,倘若当前的 Client 是 ClientImpl,那么就会发送下一条消息给服务端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 public class MariadbPacketDecoder extends ByteToMessageDecoder { private final Queue<CmdElement> responseReceivers; private final Client client; private Context context = null ; private boolean isMultipart = false ; private DecoderState state = DecoderState.INIT_HANDSHAKE; private CmdElement cmdElement; private CompositeByteBuf multipart; private long serverCapabilities; private int stateCounter = 0 ; public MariadbPacketDecoder (Queue<CmdElement> responseReceivers, Client client) { this .responseReceivers = responseReceivers; this .client = client; } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { while (buf.readableBytes() > 4 ) { int length = buf.getUnsignedMediumLE(buf.readerIndex()); if (buf.readableBytes() < length + 4 ) return ; if (length == 0xffffff ) { if (!isMultipart) { isMultipart = true ; multipart = buf.alloc().compositeBuffer(); } buf.skipBytes(4 ); multipart.addComponent(true , buf.readRetainedSlice(length)); continue ; } if (isMultipart) { buf.skipBytes(3 ); Sequencer sequencer = new Sequencer(buf.readByte()); multipart.addComponent(true , buf.readRetainedSlice(length)); handleBuffer(multipart, sequencer); multipart.release(); isMultipart = false ; continue ; } ByteBuf packet = buf.readRetainedSlice(4 + length); packet.skipBytes(3 ); Sequencer sequencer = new Sequencer(packet.readByte()); handleBuffer(packet, sequencer); packet.release(); } } private void handleBuffer (ByteBuf packet, Sequencer sequencer) { if (cmdElement == null && !loadNextResponse()) { throw new R2dbcNonTransientResourceException( "unexpected message received when no command was send" ); } state = state.decoder( packet.getUnsignedByte(packet.readerIndex()), packet.readableBytes(), serverCapabilities); ServerMessage msg = state.decode(packet, sequencer, this , cmdElement); cmdElement.getSink().next(msg); if (msg.ending()) { if (cmdElement != null ) { CmdElement element = cmdElement; loadNextResponse(); element.getSink().complete(); } client.sendNext(); } else { state = state.next(this ); } } public void connectionError (Throwable err) { if (cmdElement != null ) { cmdElement.getSink().error(err); cmdElement = null ; state = null ; } } public Client getClient () { return client; } public Context getContext () { return context; } public int getStateCounter () { return stateCounter; } public void setStateCounter (int counter) { stateCounter = counter; } public void decrementStateCounter () { stateCounter--; } public long getServerCapabilities () { return serverCapabilities; } private boolean loadNextResponse () { this .cmdElement = responseReceivers.poll(); if (cmdElement != null ) { state = cmdElement.getInitialState(); return true ; } state = null ; return false ; } public void setContext (Context context) { this .context = context; this .serverCapabilities = this .context.getServerCapabilities(); } }
结束语 本文并未对消息的编解码操作进行深入探究,只是对 mariadb-connector-r2dbc 设计进行一个大概的描述,对于具体的实现细节还请读者自行探究。通过这个项目我们可以学习到很多东西,如何将 reactor-netty 落地到项目中,响应式基础库该如何进行设计以及作者一些优秀的设计思路。