Typed Pipeline
typed pipeline 是 rs-netty 的核心约束机制。它把 stage 顺序和消息类型写进 builder 的泛型参数里,让错误尽量在编译期暴露。
Shape
TCP:
#![allow(unused)]
fn main() {
pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
}
UDP:
#![allow(unused)]
fn main() {
datagram_pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
}
这两个 builder 的 stage shape 相同,但 codec trait 和 final handler trait 不同。
Builder States
共享 state marker 位于 pipeline/core/state.rs:
Start:刚开始,只能加 codec。InboundPhase:codec 已有,可以加 inbound、business 或 final handler。BusinessPhase:已进入 business,可以继续加 business 或 final handler。Ready:final handler 已有,可以加 outbound,并可转成 runtime pipeline。
这些状态体现在 PipelineBuilder<State, C, InP, BizP, H, OutP, CurrentIn, Write, CurrentOut> 和 DatagramPipelineBuilder<...> 的第一个泛型参数上。某个阶段不合法时,对应方法根本不在该类型的 impl 块里。
Message Type Chain
以 TCP 为例,builder 约束是:
C: Decoder<Item = A>
InboundPipe<A, Out = B>
BusinessPipe<B, Out = CIn>
H: Handler<CIn, Write = W>
OutboundPipe<W, Out = COut>
codec: Encoder<COut>
这意味着:
- 第一个 inbound 的输入必须是 codec decode 出来的类型。
- 下一个 inbound/business 的输入必须等于上一个 stage 的
Out。 - final handler 的输入必须等于 inbound/business 链的最终输出。
- outbound 链的第一个输入必须等于
Handler::Write。 - outbound 链的最终输出必须能被 stream codec
Encoder<...>编码。
UDP 是同样的类型链,但 codec trait 换成 DatagramDecoder / DatagramEncoder,final handler 换成 DatagramHandler。
Compile Failures Are Intentional API
trybuild 测试里的失败用例就是文档化的约束。例如下面这种 pipeline 不会通过编译,因为 Parse 把 String 变成 Request,而 final handler 却要求 String:
#![allow(unused)]
fn main() {
let _ = pipeline()
.codec(LineCodec::new())
.inbound(Parse)
.handler(EchoString);
}
同样,下面这种也不会通过,因为 LineCodec 只能 encode String,而 handler 写出的 Response 没有被 outbound stage 转成 String:
#![allow(unused)]
fn main() {
let _ = TcpServer::bind("127.0.0.1:0")
.pipeline(|| pipeline().codec(LineCodec::new()).handler(Router));
}
实际修复方式是增加 outbound stage:
#![allow(unused)]
fn main() {
let _ = pipeline()
.codec(LineCodec::new())
.inbound(Parse)
.handler(Router)
.outbound(RenderResponse);
}
Flow
Inbound、Business 和 Outbound 返回 Result<Flow<T>>:
#![allow(unused)]
fn main() {
pub enum Flow<T> {
Next(T),
Stop,
}
}
Flow::Next 继续传递消息;Flow::Stop 消费当前消息并停止该方向处理,不当作错误。final Handler / DatagramHandler 不返回 Flow,它们返回 Result<()>,因为它们是 inbound side 的终点。