Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Typed Pipeline

typed pipeline 是 rs-netty 的核心约束机制。它把 stage 顺序和消息类型写进 builder 的泛型参数里,让错误尽量在编译期暴露。

Shape

TCP:

#![allow(unused)]
fn main() {
pipeline()
    .codec(...)
    .inbound(...)*
    .business(...)*
    .handler(...)
    .outbound(...)*
}

UDP:

#![allow(unused)]
fn main() {
datagram_pipeline()
    .codec(...)
    .inbound(...)*
    .business(...)*
    .handler(...)
    .outbound(...)*
}

这两个 builder 的 stage shape 相同,但 codec trait 和 final handler trait 不同。

Builder States

共享 state marker 位于 pipeline/core/state.rs

  • Start:刚开始,只能加 codec。
  • InboundPhase:codec 已有,可以加 inbound、business 或 final handler。
  • BusinessPhase:已进入 business,可以继续加 business 或 final handler。
  • Ready:final handler 已有,可以加 outbound,并可转成 runtime pipeline。

这些状态体现在 PipelineBuilder<State, C, InP, BizP, H, OutP, CurrentIn, Write, CurrentOut>DatagramPipelineBuilder<...> 的第一个泛型参数上。某个阶段不合法时,对应方法根本不在该类型的 impl 块里。

Message Type Chain

以 TCP 为例,builder 约束是:

C: Decoder<Item = A>
InboundPipe<A, Out = B>
BusinessPipe<B, Out = CIn>
H: Handler<CIn, Write = W>
OutboundPipe<W, Out = COut>
codec: Encoder<COut>

这意味着:

  • 第一个 inbound 的输入必须是 codec decode 出来的类型。
  • 下一个 inbound/business 的输入必须等于上一个 stage 的 Out
  • final handler 的输入必须等于 inbound/business 链的最终输出。
  • outbound 链的第一个输入必须等于 Handler::Write
  • outbound 链的最终输出必须能被 stream codec Encoder<...> 编码。

UDP 是同样的类型链,但 codec trait 换成 DatagramDecoder / DatagramEncoder,final handler 换成 DatagramHandler

Compile Failures Are Intentional API

trybuild 测试里的失败用例就是文档化的约束。例如下面这种 pipeline 不会通过编译,因为 ParseString 变成 Request,而 final handler 却要求 String

#![allow(unused)]
fn main() {
let _ = pipeline()
    .codec(LineCodec::new())
    .inbound(Parse)
    .handler(EchoString);
}

同样,下面这种也不会通过,因为 LineCodec 只能 encode String,而 handler 写出的 Response 没有被 outbound stage 转成 String

#![allow(unused)]
fn main() {
let _ = TcpServer::bind("127.0.0.1:0")
    .pipeline(|| pipeline().codec(LineCodec::new()).handler(Router));
}

实际修复方式是增加 outbound stage:

#![allow(unused)]
fn main() {
let _ = pipeline()
    .codec(LineCodec::new())
    .inbound(Parse)
    .handler(Router)
    .outbound(RenderResponse);
}

Flow

InboundBusinessOutbound 返回 Result<Flow<T>>

#![allow(unused)]
fn main() {
pub enum Flow<T> {
    Next(T),
    Stop,
}
}

Flow::Next 继续传递消息;Flow::Stop 消费当前消息并停止该方向处理,不当作错误。final Handler / DatagramHandler 不返回 Flow,它们返回 Result<()>,因为它们是 inbound side 的终点。