Introduction
rs-netty 是一个 Tokio-native、Netty-inspired 的 Rust 网络框架。它保留了 Channel / Pipeline / Handler 这些熟悉的概念,但主路径建立在 Rust 类型系统、async/await、Tokio task、bounded channel queue 和普通 owned message 上。
当前 crate 根启用了 #![deny(unsafe_code)],公开入口主要是:
pipeline():构建 TCP stream pipeline。datagram_pipeline():构建 UDP datagram pipeline。TcpServer/TcpClient:运行 TCP server/client。UdpServer/UdpClient:运行 UDP server/client。Handler/DatagramHandler/Inbound/Business/Outbound:实现 pipeline stage。Channel/DatagramChannel:从 handler 外部写入、flush 或关闭连接/Socket。Life:可选 lifecycle hook。#[handler]:为简单 handler 生成 TCP 和 UDP final handler impl。
Minimal TCP Server
这个例子贴近 examples/tcp_echo_server.rs:
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};
#[tokio::main]
async fn main() -> Result<()> {
TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(Echo)
})
.run()
.await
}
struct Echo;
#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
Ok(msg)
}
LineCodec 把 TCP byte stream 解码成 String,Echo 收到 String,宏生成的 handler 会把返回的 String 通过 outbound side 写回并 flush。
Minimal UDP Server
这个例子贴近 examples/udp_echo_server.rs:
use rs_netty::{codec::Utf8DatagramCodec, datagram_pipeline, handler, Result, UdpServer};
#[tokio::main]
async fn main() -> Result<()> {
UdpServer::bind("127.0.0.1:9002")
.pipeline(|| {
datagram_pipeline()
.codec(Utf8DatagramCodec)
.handler(UdpEcho)
})
.run()
.await
}
struct UdpEcho;
#[handler(UdpEcho)]
async fn udp_echo(msg: String) -> Result<String> {
Ok(format!("echo: {msg}"))
}
UDP pipeline 处理的是一个个完整 datagram。DatagramContext::write 默认回复当前 datagram 的 sender;如果要写给其他 peer,使用 write_to 或 write_to_and_flush。
How To Read This Guide
如果你想使用框架,先看 Typed Pipeline、TCP、UDP、Handlers、Codecs。如果你要扩展框架,重点看 Architecture、Lifecycle、Channel Write And Flush、Extension Guide 和 API Map。
Design Goals
rs-netty 的设计目标不是把 Java Netty 原样搬到 Rust,而是把 pipeline/handler/channel 这套建模方式换成 Rust-native 的形式。
Keep The Useful Shape
保留的概念:
- codec 位于 pipeline 边界,负责 bytes/datagrams 和 typed message 之间的转换。
- inbound stage 处理解码后的入站消息。
- final handler 处理应用语义,并写出应用层响应。
- outbound stage 把 handler 写出的应用类型转换成 codec 可编码类型。
- channel handle 可以从当前 handler 外部写入、flush 或关闭连接。
- lifecycle hook 可以观察 server、connection 和 UDP socket 的启动/关闭。
Use Rust Types Instead Of Dynamic Pipeline Mutation
Java Netty 的动态 pipeline 很灵活,但错误通常在运行时出现:handler 顺序不对、上游消息类型与下游不匹配、TCP pipeline 被误用于 UDP 等。rs-netty 把这些约束编码在 builder 类型里:
codec -> inbound* -> business* -> handler -> outbound*
只有当前状态允许的方法才存在。比如在 .codec(...) 之前没有 .handler(...),在 .handler(...) 之后没有 .inbound(...),并且最终 outbound 类型必须被 codec 的 encoder 支持。
仓库里的 trybuild 用例覆盖了这些失败场景,例如:
fail_stream_handler_before_codec.rsfail_stream_outbound_before_handler.rsfail_stream_type_mismatch_inbound_to_handler.rsfail_stream_final_encoder_mismatch.rsfail_udp_with_stream_pipeline.rsfail_tcp_with_datagram_pipeline.rs
Separate TCP And UDP Builders
TCP 使用 pipeline(),要求 codec 实现 Decoder 和 Encoder<T>。UDP 使用 datagram_pipeline(),要求 codec 实现 DatagramDecoder 和 DatagramEncoder<T>。
这不是命名偏好,而是类型边界。TcpServer::pipeline 只接受 IntoStreamPipeline,UdpServer::pipeline 只接受 IntoDatagramPipeline。因此 TCP/UDP pipeline 混用不会通过编译。
Prefer Owned Messages
框架没有暴露 Java Netty 风格的 reference-counted ByteBuf。codec 和 handler 在公开 API 中使用 String、bytes::Bytes、用户自定义 struct 等 owned 类型。这样更贴近 Rust 的所有权模型,也减少了 refCnt 误用带来的生命周期风险。
Bounded Queues And Explicit Flush
Channel 和 DatagramChannel 背后使用 bounded Tokio mpsc。outbound_queue_size 控制外部写入命令队列大小;队列满时写调用等待容量,而不是无限制堆积。
写入和 flush 明确分离:
write只排队或暂存。flush推送已暂存数据到 socket。write_and_flush写入并建立一个 flush 边界。
这个模型让高吞吐场景可以 batch,也让低延迟场景可以显式 flush。
Zero Unsafe
crate 根使用 #![deny(unsafe_code)],rs-netty-macros 也启用了同样的约束。当前主库和宏库都不依赖 unsafe 作为实现主路径。
Architecture
rs-netty 的主路径可以按四层理解:transport、pipeline runtime、stage traits、channel/context。
Crate Layout
src/lib.rs:公开模块和常用类型 re-export。src/traits.rs:Inbound、Business、Handler、DatagramHandler、Outbound、Flow。src/pipeline/stream:TCP typed builder 和 runtime pipeline。src/pipeline/datagram:UDP typed builder 和 runtime pipeline。src/pipeline/core:共享的Identity、Then、stage pipe traits 和 builder state marker。src/codec:stream/datagram codec traits 和内置 codec。src/context:stage context、handler context、stats 和 identity 信息。src/channel:外部写入 handle,以及内部 command enum。src/tls.rs:tlsfeature 下的 TLS context builder 和 server/client context。src/transport/tcp:TCP server、client、connection runtime 和配置。src/transport/udp:UDP server、client、socket runtime 和配置。src/life.rs:lifecycle hook trait 和 close reason。rs-netty-macros:#[handler]attribute macro。
TCP Runtime Flow
TCP server 每个 accepted connection 都会调用 pipeline factory,创建独立 pipeline 实例。client 可以使用可复用 factory,也可以用 pipeline_instance 消耗一个单次 pipeline。
运行时大致流程:
TcpListener / TcpStream
-> optional TLS accept/connect
-> read_buf
-> Decoder::decode
-> InboundPipe
-> BusinessPipe
-> Handler::read(Context<W>, msg)
-> Context outbox or Channel command queue
-> OutboundPipe
-> Encoder::encode
-> write_buf
-> flush/write_all
StreamConnectionRuntime 负责 select socket read、external channel command 和 shutdown signal。没有 idle timeout 时走 no-timeout loop;配置了 idle_timeout 时增加一个 read-idle timer。这个 timer 只由 socket read 重置,外部 outbound write 不重置。
UDP Runtime Flow
UDP server/client 都围绕一个 socket task 运行,pipeline 是 socket-level 的:
UdpSocket::recv_from
-> DatagramDecoder::decode_datagram
-> InboundPipe
-> BusinessPipe
-> DatagramHandler::read(DatagramContext<W>, msg)
-> DatagramContext outbox or DatagramChannel command queue
-> OutboundPipe
-> DatagramEncoder::encode_datagram
-> pending_datagrams
-> flush/send_to
UDP server 目前不为每个 peer 创建 child pipeline。需要 per-peer state 时,handler 应自己维护 HashMap<SocketAddr, State> 之类的结构。
Static Stage Composition
builder 在类型层面把 stage 串成 Then<A, B>。Identity 表示没有该方向的 stage。运行时的 InboundPipe、BusinessPipe 和 OutboundPipe 递归处理 Then:
- 前一个 stage 返回
Flow::Next(value)时,继续传给后一个 stage。 - 返回
Flow::Stop时,当前消息方向停止,不视为错误。 - 返回
Err时,连接或 socket runtime 将错误映射为 decode/encode/handler/runtime 错误。
主路径不需要动态 Box<dyn Handler> pipeline 分发;pipeline 类型由泛型静态组合出来。
Channel And Context
Context<W> 和 DatagramContext<W> 是 final handler 里的写入入口。它们持有 handler-local outbox,适合在同一次 read 内进行多次写入和控制 flush 边界。
Channel<W> 和 DatagramChannel<W> 是 cloneable external handle。它们通过 bounded Tokio mpsc 把命令发送给 connection/socket task。TCP channel 暴露 stats()、capacity()、max_capacity() 和 is_closed();UDP channel 暴露 socket id/local addr 和 queue 状态。
Typed Pipeline
typed pipeline 是 rs-netty 的核心约束机制。它把 stage 顺序和消息类型写进 builder 的泛型参数里,让错误尽量在编译期暴露。
Shape
TCP:
#![allow(unused)]
fn main() {
pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
}
UDP:
#![allow(unused)]
fn main() {
datagram_pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
}
这两个 builder 的 stage shape 相同,但 codec trait 和 final handler trait 不同。
Builder States
共享 state marker 位于 pipeline/core/state.rs:
Start:刚开始,只能加 codec。InboundPhase:codec 已有,可以加 inbound、business 或 final handler。BusinessPhase:已进入 business,可以继续加 business 或 final handler。Ready:final handler 已有,可以加 outbound,并可转成 runtime pipeline。
这些状态体现在 PipelineBuilder<State, C, InP, BizP, H, OutP, CurrentIn, Write, CurrentOut> 和 DatagramPipelineBuilder<...> 的第一个泛型参数上。某个阶段不合法时,对应方法根本不在该类型的 impl 块里。
Message Type Chain
以 TCP 为例,builder 约束是:
C: Decoder<Item = A>
InboundPipe<A, Out = B>
BusinessPipe<B, Out = CIn>
H: Handler<CIn, Write = W>
OutboundPipe<W, Out = COut>
codec: Encoder<COut>
这意味着:
- 第一个 inbound 的输入必须是 codec decode 出来的类型。
- 下一个 inbound/business 的输入必须等于上一个 stage 的
Out。 - final handler 的输入必须等于 inbound/business 链的最终输出。
- outbound 链的第一个输入必须等于
Handler::Write。 - outbound 链的最终输出必须能被 stream codec
Encoder<...>编码。
UDP 是同样的类型链,但 codec trait 换成 DatagramDecoder / DatagramEncoder,final handler 换成 DatagramHandler。
Compile Failures Are Intentional API
trybuild 测试里的失败用例就是文档化的约束。例如下面这种 pipeline 不会通过编译,因为 Parse 把 String 变成 Request,而 final handler 却要求 String:
#![allow(unused)]
fn main() {
let _ = pipeline()
.codec(LineCodec::new())
.inbound(Parse)
.handler(EchoString);
}
同样,下面这种也不会通过,因为 LineCodec 只能 encode String,而 handler 写出的 Response 没有被 outbound stage 转成 String:
#![allow(unused)]
fn main() {
let _ = TcpServer::bind("127.0.0.1:0")
.pipeline(|| pipeline().codec(LineCodec::new()).handler(Router));
}
实际修复方式是增加 outbound stage:
#![allow(unused)]
fn main() {
let _ = pipeline()
.codec(LineCodec::new())
.inbound(Parse)
.handler(Router)
.outbound(RenderResponse);
}
Flow
Inbound、Business 和 Outbound 返回 Result<Flow<T>>:
#![allow(unused)]
fn main() {
pub enum Flow<T> {
Next(T),
Stop,
}
}
Flow::Next 继续传递消息;Flow::Stop 消费当前消息并停止该方向处理,不当作错误。final Handler / DatagramHandler 不返回 Flow,它们返回 Result<()>,因为它们是 inbound side 的终点。
TCP
TCP 支持 server 和 client,使用 stream pipeline。
Server
TcpServer::bind(addr) 创建 builder。必须调用 .pipeline(...) 设置 per-connection pipeline factory,之后可以 .run().await 或 .start().await。
#![allow(unused)]
fn main() {
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};
struct Echo;
#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
Ok(msg)
}
let server = TcpServer::bind("127.0.0.1:0")
.pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
.start()
.await?;
server.shutdown();
server.wait().await?;
Ok::<(), rs_netty::Error>(())
}
start 返回 TcpServerHandle,可以读取 local_addr()、调用 shutdown(),再 wait().await 等待 server task 和连接 task 收尾。
Client
TcpClient::connect(addr) 有两种 pipeline 设置方式:
.pipeline(|| ...):可复用 factory,适合无状态或可 clone 状态。.pipeline_instance(...):消费一个已构建 pipeline,适合 handler 拥有oneshot::Sender这类一次性状态。
贴近 examples/tcp_json_line_echo.rs 的 client 片段:
#![allow(unused)]
fn main() {
let (tx, rx) = tokio::sync::oneshot::channel();
let client = TcpClient::connect("127.0.0.1:9003")
.pipeline_instance(
pipeline()
.codec(LineCodec::new())
.inbound(JsonDecode::<Response>::new())
.handler(PrintResponse { response_tx: Some(tx) })
.outbound(JsonEncode::<Request>::new()),
)
.run()
.await?;
client.write_and_flush(Request { message: "hello json".to_string() }).await?;
let _ = rx.await;
client.close().await?;
client.wait().await?;
Ok::<(), rs_netty::Error>(())
}
TcpClientHandle<W> 提供 channel()、write、flush、write_and_flush、close 和 wait。
Configuration
TCP server 和 client 共享 TcpConnectionConfig,默认值在 src/transport/tcp/config.rs:
read_buffer_capacity: 8 KiB。write_buffer_capacity: 8 KiB。max_frame_size: 1 MiB。outbound_queue_size: 1024。tcp_nodelay:true。idle_timeout:None。track_connection_stats:false。
builder 方法包括:
read_buffer_capacity(value)write_buffer_capacity(value)max_frame_size(value)outbound_queue_size(value)tcp_nodelay(value)idle_timeout(duration)track_connection_stats()
client 额外有 bind(local_addr),用于指定本地地址。
Runtime Details
TCP runtime 用 BytesMut 做 read/write buffer。每次 socket read 后会循环调用 codec decode,直到返回 Ok(None)。如果 read buffer 超过 max_frame_size,连接以 FrameTooLarge 关闭。
外部 Channel 命令通过 bounded queue 进入 runtime。write 只 encode 到 write buffer;flush 才执行 write_all;write_and_flush encode 后立即 flush。
启用 track_connection_stats() 后,Context::stats() 和 Channel::stats() 可以看到连接时间、bytes read/written、frames read/written。frames_written 统计的是 encode 到 write buffer 的 frame,不保证已经 flush 到 socket。
UDP
UDP 使用 datagram pipeline。它的模型和 TCP 相似,但边界是完整 datagram,不是 byte stream。
Server
UdpServer::bind(addr) 创建 socket server builder。.pipeline(|| ...) 设置 socket-level pipeline factory:
#![allow(unused)]
fn main() {
use rs_netty::{codec::Utf8DatagramCodec, datagram_pipeline, handler, Result, UdpServer};
struct UdpEcho;
#[handler(UdpEcho)]
async fn udp_echo(msg: String) -> Result<String> {
Ok(format!("echo: {msg}"))
}
let server = UdpServer::bind("127.0.0.1:0")
.pipeline(|| {
datagram_pipeline()
.codec(Utf8DatagramCodec)
.handler(UdpEcho)
})
.start()
.await?;
server.shutdown();
server.wait().await?;
Ok::<(), rs_netty::Error>(())
}
UdpServerHandle 提供 local_addr()、shutdown() 和 wait()。
Client
UdpClient::connect(remote_addr) 默认绑定 "0.0.0.0:0"。可以用 .bind(local_addr) 指定本地地址。UdpClientHandle<W> 对默认 remote peer 提供:
write(msg)flush()write_and_flush(msg)
也提供显式 peer 版本:
write_to(peer_addr, msg)write_to_and_flush(peer_addr, msg)
Socket-Level Pipeline
UDP server 当前只创建一个 socket-level pipeline,不为每个 peer 创建 child pipeline。每个 datagram 处理时都会构造新的 DatagramInfo、InboundContext、BusinessContext、DatagramContext 和 OutboundContext,其中 peer_addr() 是当前 datagram 的来源地址。
如果应用需要 per-peer 状态,请在 handler 中自己维护状态。例如:
#![allow(unused)]
fn main() {
use std::{collections::HashMap, net::SocketAddr};
struct PeerState;
struct StatefulUdp {
peers: HashMap<SocketAddr, PeerState>,
}
}
Configuration
UDP 使用 UdpSocketConfig:
read_buffer_capacity: 默认 64 KiB,运行前会 normalize 到至少max_datagram_size。write_buffer_capacity: 默认 8 KiB。max_datagram_size: 默认 64 KiB。outbound_queue_size: 默认 1024。
builder 方法包括:
read_buffer_capacity(value)write_buffer_capacity(value)max_datagram_size(value)outbound_queue_size(value)
UDP 当前没有 TCP 的 tcp_nodelay、connection stats 或 idle timeout。
Datagram Write Semantics
DatagramContext::write(msg) 写给当前 datagram peer;write_to(peer, msg) 写给显式 peer。两者都只是暂存到 handler-local outbox。flush 或 *_and_flush 会把 pending datagrams 通过 send_to 发出。
UDP 的 flush acknowledgement 只表示本地 send_to 已完成,不表示对端收到,也不提供顺序、重传或可靠性保证。
Handlers
rs-netty 有五类 stage trait。它们都在 src/traits.rs,并通过 trait-variant 生成 Send 版本作为公开主接口。
Inbound
Inbound<I> 用于解码后、final handler 前的入站转换:
#![allow(unused)]
fn main() {
struct Trim;
impl Inbound<String> for Trim {
type Out = String;
async fn read(
&mut self,
_ctx: &mut rs_netty::InboundContext,
msg: String,
) -> rs_netty::Result<rs_netty::Flow<Self::Out>> {
Ok(rs_netty::Flow::Next(msg.trim().to_string()))
}
}
}
InboundContext 暴露 id()、peer_addr()、local_addr(),不提供写能力。
Business
Business<I> 是 inbound 和 final handler 之间的应用级转换阶段。它和 Inbound 的类型形状类似,但方法名是 handle,context 是 BusinessContext。builder 限制是:进入 business 后只能继续加 business 或 final handler,不能再回到 inbound。
Handler
Handler<I> 是 TCP inbound side 的终点:
#![allow(unused)]
fn main() {
impl Handler<Request> for Router {
type Write = Response;
async fn read(&mut self, ctx: &mut Context<Self::Write>, req: Request) -> Result<()> {
ctx.write_and_flush(Response { body: req.body }).await
}
}
}
type Write 是 handler 能写回 outbound side 的应用类型。后续 outbound stage 会从这个类型开始转换,最终转成 codec 可编码的类型。
Context<W> 提供:
- identity:
id、peer_addr、local_addr channel():cloneable external channelstats():启用 stats 时返回ConnectionStatswrite、flush、write_and_flushclose
DatagramHandler
DatagramHandler<I> 是 UDP inbound side 的终点。它使用 DatagramContext<W>,支持 write、write_to、flush、write_and_flush、write_to_and_flush 和 close。
#![allow(unused)]
fn main() {
impl DatagramHandler<String> for UdpEcho {
type Write = String;
async fn read(&mut self, ctx: &mut DatagramContext<Self::Write>, msg: String) -> Result<()> {
ctx.write_and_flush(format!("echo: {msg}")).await
}
}
}
Outbound
Outbound<I> 用于把 handler 写出的应用类型渲染成下游 outbound 类型。最终 outbound 类型必须由 codec encode。
#![allow(unused)]
fn main() {
struct RenderResponse;
impl Outbound<Response> for RenderResponse {
type Out = String;
async fn write(
&mut self,
_ctx: &mut rs_netty::OutboundContext,
msg: Response,
) -> rs_netty::Result<rs_netty::Flow<Self::Out>> {
Ok(rs_netty::Flow::Next(msg.body))
}
}
}
OutboundContext 同样只暴露 identity,不提供直接写能力。
Macro Or Manual Impl
#[handler] 适合一进一出的简单 final handler,或者 consume-only handler。需要直接访问 Context / DatagramContext、控制 flush timing、多次写入、关闭连接或拿 channel() 时,写手动 impl 更清晰。
Codecs
codec 位于 pipeline 的边界。stream codec 用于 TCP byte stream;datagram codec 用于 UDP datagram。
Stream Codec Traits
#![allow(unused)]
fn main() {
pub trait Decoder: Send + 'static {
type Item: Send + 'static;
fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>>;
}
pub trait Encoder<I>: Send + 'static {
fn encode(&mut self, item: I, dst: &mut bytes::BytesMut) -> Result<()>;
}
}
decode 只在完整 frame 可用时消费 bytes 并返回 Some(item)。需要更多 bytes 时返回 Ok(None)。
Datagram Codec Traits
#![allow(unused)]
fn main() {
pub trait DatagramDecoder: Send + 'static {
type Item: Send + 'static;
fn decode_datagram(&mut self, src: &[u8]) -> Result<Self::Item>;
}
pub trait DatagramEncoder<I>: Send + 'static {
fn encode_datagram(&mut self, item: I, dst: &mut bytes::BytesMut) -> Result<()>;
}
}
UDP decoder 每次收到的是一个完整 datagram payload。
Built-In Stream Codecs
LineCodec:UTF-8 line codec,decode 时去掉\n和可选\r,encode 时追加\n。ByteArrayDecoder:把当前 buffer 中全部 bytes 作为一个Bytes输出;同时可 encodeBytes。ByteArrayEncoder:只实现Encoder<Bytes>。FixedLengthFrameDecoder:固定长度 frame codec,encode 时要求长度精确匹配。DelimiterBasedFrameDecoder:按一个或多个 delimiter 切分二进制 frame,可控制是否保留 delimiter。LengthFieldBasedFrameDecoder:Netty-shaped length-field frame decoder,支持 1/2/3/4/8 字节长度字段、offset、adjustment、strip 和 byte order。作为 encoder 使用时只支持 offset 0 和 adjustment 0。LengthFieldPrepender:只负责给Bytes追加长度字段,常用于自定义组合 codec。HttpCodec:最小 HTTP/1.1 server codec,decode request,encode response。支持Content-Length,可选 chunked request body。MqttCodec:MQTT 5 packet codec,处理 fixed header、Remaining Length、properties 和支持的控制包;不维护 broker/client session state。WebSocketCodec:websocketfeature 下可用,server-side WebSocket codec,从 HTTP Upgrade handshake state 切到 frame state。HttpWsCodec:websocketfeature 下可用,在同一 TCP port 上先处理 HTTP request,再对 WebSocket upgrade 切到 frame state。
Built-In Datagram Codecs
Utf8DatagramCodec:每个 datagram 是一个 UTF-8String。BytesDatagramCodec:每个 datagram 是一个 rawbytes::Bytes。
JSON Is A Pipeline Stage
JsonDecode<T> 和 JsonEncode<T> 在 json feature 下可用。它们不是 framing codec,而是 typed pipeline stage:
#![allow(unused)]
fn main() {
let pipeline = pipeline()
.codec(LineCodec::new())
.inbound(JsonDecode::<Request>::new())
.handler(ApiHandler)
.outbound(JsonEncode::<Response>::new());
}
这样 framing 和 JSON parsing/serialization 分离。LineCodec 负责消息边界,JsonDecode<T> 负责 String/Bytes 到 T,JsonEncode<T> 负责 T 到 compact JSON String。
Codec Position
codec 总是 builder 的第一个必需 stage。它决定 inbound 链的初始类型,也决定 outbound 链最终必须产出的类型。
例如:
#![allow(unused)]
fn main() {
pipeline()
.codec(LineCodec::new()) // Decoder<Item = String>, Encoder<String>
.inbound(ParseRequest) // String -> Request
.handler(Router) // Request -> writes Response
.outbound(RenderResponse); // Response -> String
}
如果省略 RenderResponse,LineCodec 无法 encode Response,编译失败。
Lifecycle
Life 是可选 lifecycle hook trait。默认实现是 NoLife,所有方法 no-op。
#![allow(unused)]
fn main() {
#[derive(Clone, Copy)]
struct PrintLife;
impl rs_netty::Life for PrintLife {
async fn tcp_server_started(&self, local_addr: std::net::SocketAddr) -> rs_netty::Result<()> {
println!("server started: {local_addr}");
Ok(())
}
}
}
Hooks
当前 hooks:
tcp_server_started(local_addr)tcp_server_stopped(local_addr)tcp_connection_opened(info)tcp_connection_closed(info, reason)udp_socket_started(local_addr)udp_socket_stopped(local_addr)
TCP connection close reason 使用 CloseReason,包括 PeerClosed、LocalClosed、ChannelClosed、HandlerClosed、ServerShutdown、IdleTimeout、IoError、DecodeError、EncodeError、FrameTooLarge、HandlerError。
Startup And Shutdown Behavior
TCP server start() 绑定 listener 后调用 tcp_server_started。如果该 hook 返回错误,start() 返回错误。
每个 TCP connection accepted 或 client connected 后调用 tcp_connection_opened。如果该 hook 失败,连接任务返回错误。
关闭时的 hook 失败会记录日志,并尽量保留原始关闭结果。
UDP socket task 启动时调用 udp_socket_started,停止时调用 udp_socket_stopped。
Graceful Shutdown
TCP/UDP server 的 start() 返回 handle:
#![allow(unused)]
fn main() {
let server = TcpServer::bind("127.0.0.1:0")
.pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
.start()
.await?;
server.shutdown();
server.wait().await?;
Ok::<(), rs_netty::Error>(())
}
TCP server shutdown 使用 watch channel 通知 accept loop 和 connection tasks。UDP server shutdown 通知 socket task 退出。
client 没有 server shutdown handle,使用 client.close().await? 请求本地 connection/socket task 关闭,然后 client.wait().await? 等待任务结束。
Idle Timeout
TCP 支持 idle_timeout(duration)。它关闭一段时间内没有 socket read 的连接,并以 CloseReason::IdleTimeout 通知 lifecycle hook。源码中的 timeout 只在 read 成功后 reset;纯 outbound 写不会 reset 这个 read-idle timer。
UDP 当前没有 idle timeout。
Connection Stats
TCP server/client 可以通过 track_connection_stats() 启用 stats。启用后:
Context::stats()返回当前连接的ConnectionStats。Channel::stats()返回 cloneable stats handle。- 统计包括
connected_at、bytes_read、bytes_written、frames_read、frames_written。
stats 默认关闭,避免不需要监控时产生额外 shared allocation 和 atomic update。
Channel Write And Flush
rs-netty 明确区分 write 和 flush。这一点对延迟、吞吐和测试都很重要。
Handler Context Writes
TCP Context<W>:
write(msg):把消息放入当前 handler 的 local outbox。返回的WriteHandle立即 ready,await 只是兼容 async 风格。flush():请求 flush 当前 handler 已暂存的消息。返回的 handle 可以丢弃,也可以 await。write_and_flush(msg):暂存消息并创建 flush 边界。
UDP DatagramContext<W> 类似,但写入目标可以是当前 peer 或显式 peer:
write(msg)write_to(peer_addr, msg)flush()write_and_flush(msg)write_to_and_flush(peer_addr, msg)
Await Vs Fire And Forget
flush() 和 write_and_flush() 返回的 handle 可以直接丢弃:
#![allow(unused)]
fn main() {
ctx.write_and_flush("first".to_string());
}
这会请求 runtime 在 handler future 仍 pending 时就 drain outbox。tests/server_lifecycle.rs 验证了 fire-and-forget flush 在 handler return 之前也能把第一条响应发出去。
如果 await:
#![allow(unused)]
fn main() {
ctx.write_and_flush("first".to_string()).await?;
}
await 的含义是等待本地 socket write 完成这个 flush 边界。它不是远端确认,也不表示远端 handler 已处理。
Plain Write Buffers
plain write 不会自动 flush。测试覆盖了:
tcp_context_write_buffers_until_explicit_flushudp_context_write_buffers_until_explicit_flushtcp_channel_write_buffers_until_flushudp_client_write_preserves_datagrams_until_flush
需要发送时使用:
#![allow(unused)]
fn main() {
ctx.write("sent".to_string()).await?;
ctx.flush().await?;
}
或:
#![allow(unused)]
fn main() {
ctx.write_and_flush("sent".to_string()).await?;
}
External Channel Writes
Channel<W> 是 TCP external handle。它通过 bounded mpsc 向 connection task 发送命令:
write(msg):等待 queue capacity,把消息排队并 encode 到 write buffer;不 flush。flush():等待之前排队的写 flush 到 socket。write_and_flush(msg):排队消息并等待本地 socket write 完成。close():请求本地连接关闭。
DatagramChannel<W> 是 UDP external handle:
write_to(peer, msg):排队 datagram;不发送。flush():send 所有 pending datagrams。write_to_and_flush(peer, msg):排队并 send。close():请求 socket task 退出。
UdpClientHandle<W> 对默认 remote peer 包装了 write、flush 和 write_and_flush。
Bounded Queue Boundary
outbound_queue_size 控制 channel command queue 容量,默认 1024。队列满时 external write/flush/write_and_flush 会等待容量,而不是无限增长。
handler-local outbox 和 external channel queue 是两个不同边界。handler 内部的 Context 写入先进入 local outbox,再由 runtime drain 到 codec/write buffer 或 pending datagrams。
Macros
rs-netty-macros 提供 #[handler]。主 crate 默认 feature 包含 macros,因此通常可以直接:
#![allow(unused)]
fn main() {
use rs_netty::handler;
}
What It Generates
#[handler(TypeName)] 把一个 async function 适配成 Handler<I> 和 DatagramHandler<I> 两个 impl。用户仍然显式声明 handler struct,宏只生成样板代码。
一进一出:
#![allow(unused)]
fn main() {
struct Echo;
#[handler(Echo)]
async fn echo(msg: String) -> rs_netty::Result<String> {
Ok(msg)
}
}
大致等价于:
#![allow(unused)]
fn main() {
impl rs_netty::Handler<String> for Echo {
type Write = String;
async fn read(
&mut self,
ctx: &mut rs_netty::Context<Self::Write>,
msg: String,
) -> rs_netty::Result<()> {
let msg = echo(msg).await?;
ctx.write_and_flush(msg).await
}
}
}
宏同时生成 DatagramHandler<String>,方法体使用 DatagramContext::write_and_flush。
Consume-Only Handler
如果函数返回 Result<()>,宏无法从返回值推导 type Write,必须写 write = Type:
#![allow(unused)]
fn main() {
struct PrintResponse;
#[handler(PrintResponse, write = String)]
async fn print_response(msg: String) -> rs_netty::Result<()> {
println!("server -> {msg}");
Ok(())
}
}
这里 write = String 的意思是这个 handler 所在连接仍然可以从外部 channel 写入 String。
Handler State
需要访问 handler 字段时,把 &mut HandlerType 放在第一个参数:
#![allow(unused)]
fn main() {
struct PrintResponse {
response_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
#[handler(PrintResponse, write = Request)]
async fn print_response(handler: &mut PrintResponse, res: Response) -> rs_netty::Result<()> {
if let Some(tx) = handler.response_tx.take() {
let _ = tx.send(());
}
println!("server -> {}", res.echoed);
Ok(())
}
}
这正是 examples/tcp_json_line_echo.rs 使用的模式。
Limits
宏要求:
- 被标注的函数必须是
async fn。 - 函数必须返回
Result<T>。 - 参数最多是
(&mut HandlerType, msg)或(msg)。 write = Type只允许用于返回Result<()>的函数。
需要直接操作 Context/DatagramContext、多次写入、手动 flush、关闭连接、拿 channel() 或分支复杂时,写手动 impl。
Examples
仓库的 examples 覆盖了 TCP、UDP、typed chain、JSON、TLS、lifecycle、HTTP 和 WebSocket。
TCP Echo
examples/tcp_echo_server.rsexamples/tcp_echo_client.rs
运行:
cargo run --example tcp_echo_server
cargo run --example tcp_echo_client
server 使用 LineCodec 和 #[handler(Echo)],client 使用 write_and_flush 发送两行。
Typed TCP Chain
examples/tcp_typed_chain.rsexamples/tcp_typed_chain_client.rs
运行:
cargo run --example tcp_typed_chain
cargo run --example tcp_typed_chain_client
server pipeline:
#![allow(unused)]
fn main() {
pipeline()
.codec(LineCodec::new())
.inbound(Trim)
.inbound(ParseRequest)
.handler(Router)
.outbound(RenderResponse)
}
这个例子展示 String -> Request -> Response -> String 的完整类型链。
JSON Over Line
examples/tcp_json_line_echo.rs
运行 server:
cargo run --example tcp_json_line_echo --features json
运行 client:
cargo run --example tcp_json_line_echo --features json -- client
这个例子把 framing 和 JSON 分开:LineCodec 负责行边界,JsonDecode<T> / JsonEncode<T> 负责 typed JSON。
TLS Echo
examples/tcp_tls_echo.rs
运行:
cargo run --example tcp_tls_echo --features tls
这个例子生成 localhost 测试证书,通过 .tls(...) 挂到 TCP transport 上,同时继续把 LineCodec 作为应用明文 codec。
同一组 TLS context API 也支持 required 或 optional mTLS、ALPN 发布、SNI-specific certificate identities,以及通过 TCP handler/stage context 读取 TlsInfo metadata。
Lifecycle
examples/tcp_lifecycle.rs
运行:
cargo run --example tcp_lifecycle
这个例子实现 Life,打印 TCP server started/stopped 和 connection opened/closed。
UDP Echo
examples/udp_echo_server.rsexamples/udp_echo_client.rs
运行:
cargo run --example udp_echo_server
cargo run --example udp_echo_client
使用 Utf8DatagramCodec 和 datagram_pipeline()。
Typed UDP Chain
examples/udp_typed_chain.rsexamples/udp_typed_chain_client.rs
运行:
cargo run --example udp_typed_chain
cargo run --example udp_typed_chain_client
展示 UDP datagram pipeline 的 typed inbound/outbound 转换。
HTTP And WebSocket
examples/http_server.rsexamples/websocket_server.rsexamples/http_websocket_server.rs
运行:
cargo run --example http_server
cargo run --example websocket_server --features websocket
cargo run --example http_websocket_server --features websocket
http_server 使用 HttpCodec。websocket_server 使用 WebSocketCodec。http_websocket_server 使用 HttpWsCodec 和 HttpWsRouter 在同一个 port 上处理 HTTP request 与 WebSocket upgrade。
Benchmarks
benchmarks/ 下包含三个可比较 harness:
benchmarks/rs-netty:rs-netty echo server/client。benchmarks/tokio:裸 Tokio echo server/client。benchmarks/netty:Java Netty echo server/client。
它们对齐了 wire protocols:
line:TCP line echo,payload + "\n"。len:TCP length-field echo,u32be length + payload。udp:UDP datagram echo。
Directional Snapshot
benchmark 结果是方向性快照,不是通用性能承诺。吞吐、延迟和 RSS 会受 host、NIC、OS、JVM warmup、TCP 设置、payload shape、并发数、in-flight 数、是否 loopback 等因素影响。
README 中的表格来自本仓库 benchmark harness 的一次本地非 loopback 运行。它适合用于理解当前实现的大致量级和相对趋势,不应作为在任意生产环境中的保证。
Runner
主入口是:
python3 benchmarks/run.py \
--impls rs-netty tokio netty \
--protocols line len udp \
--connections 100 \
--messages 1000000 \
--payload 128 \
--in-flight 16 \
--output-dir benchmarks/results
runner 会:
- 自动选择非 loopback 本地 IPv4,或使用
--host。 - 拒绝
localhost、127.0.0.1、::1。 - build selected implementations。
- 启动 server 并采样 server RSS。
- 运行 matching client。
- 解析
RESULT ...行。 - 写出 CSV、日志和图表。
输出包括:
results.csv*.server.out.log*.server.err.log*.client.out.log*.client.err.logthroughput.pngp99_latency.pngserver_memory.pnglatency_percentiles.png
开启 --profile cpu 时,macOS sample(1) 会生成 server sample 和 profile summary。
Smoke Run
快速冒烟:
python3 benchmarks/run.py \
--impls rs-netty tokio \
--protocols len \
--connections 2 \
--messages 100 \
--payload 32 \
--in-flight 4
如果要包含 Netty,需要本机有 Maven 和 JDK。Rust harness release build 由 runner 自动执行。
rs-netty Harness Notes
benchmarks/rs-netty/src/main.rs 包含:
server-rs-line/server-rs-line-string:LineCodec+Handler<String>。server-rs-line-bytes:自定义BytesLineCodec+Handler<Bytes>。server-rs-line-sync:awaitwrite_and_flush的 line echo 变体。server-rs-len:自定义组合 codec,内部使用LengthFieldBasedFrameDecoder和LengthFieldPrepender。server-rs-udp:Utf8DatagramCodec+DatagramHandler<String>。
client 端用裸 Tokio 连接、记录 latency percentiles,并打印统一 RESULT 行。
Non Goals
以下内容不是当前 rs-netty 主路径目标。它们来自 README、公开 API 和源码边界。
No EventLoop API
rs-netty 直接使用 Tokio runtime、listener/socket task 和 per-connection task。它没有暴露 Java Netty 风格的 EventLoop 或 EventLoopGroup API。
No ByteBuf RefCnt API
公开 API 使用 bytes::Bytes、BytesMut、String 和用户自定义 owned 类型。框架不暴露 reference-counted ByteBuf 或 retain/release/refCnt 模型。
No ChannelFuture / Promise API
写入 API 使用 Rust async 和 Result。flush / write_and_flush 的 acknowledgement 是通过 await 本地 socket write 完成来表达,不提供 Java Netty 风格 ChannelFuture 或 Promise 主路径。
No Dynamic Boxed Handler Main Path
默认 pipeline 由泛型静态 stage 组合而成,不把 Box<dyn Handler> 作为主路径。这样 stage 顺序和消息类型才能在编译期检查。
No Runtime Pipeline Mutation API
当前 builder 建好 pipeline 后,运行时不提供 Netty 式 pipeline.addLast/remove/replace 动态修改主路径。
No TLS Pipeline Stage
TLS 被建模为可选 TCP transport layer,而不是 codec 或普通 pipeline stage。
TLS stream 建立后,typed pipeline 仍然处理明文应用消息。
TLS metadata 会通过 TlsInfo 暴露,但 TLS negotiation 本身仍发生在 transport 边界,而不是作为动态 pipeline stage。
No Codec Registry
内置 codec 是普通 Rust 类型,pipeline 显式实例化它们。当前没有全局 codec registry、协议名查找或运行时 codec negotiation registry。
No Automatic UDP Reliability
UDP 支持 datagram send/recv,但不提供可靠性、排序、重传、拥塞控制或自动 session 管理。
No Per-Peer UDP Child Pipeline
UDP server 使用 socket-level pipeline,不为每个 remote peer 创建独立 child pipeline。per-peer state 由应用 handler 管理。
No MQTT Broker State
MqttCodec 负责 MQTT 5 packet 编解码和局部格式校验,不维护 broker/client session、subscription tree、QoS 状态机或 retained message store。
Minimal HTTP/WebSocket Scope
HttpCodec 和 WebSocket codec 适合简单 server-side pipeline 示例和轻量用途。它们不是完整 HTTP framework,不提供 routing DSL、middleware stack、HTTP/2、compression、WebSocket extension negotiation 或 fragmented data frame reassembly。
Extension Guide
这一章给出新增 codec、handler 和 example 的最小路径。
Add A TCP Codec
实现 Decoder 和 Encoder<T>。如果 decode 输出和 encode 输入不同,也可以实现 Decoder 在一个类型上、Encoder<T> 在同一个类型上,并通过 outbound stage 把应用响应转成 T。
#![allow(unused)]
fn main() {
use bytes::{Buf, BufMut, Bytes, BytesMut};
use rs_netty::{codec::{Decoder, Encoder}, Error, Result};
struct LengthCodec;
impl Decoder for LengthCodec {
type Item = Bytes;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
if src.len() < 4 {
return Ok(None);
}
let len = u32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize;
if src.len() < 4 + len {
return Ok(None);
}
src.advance(4);
Ok(Some(src.split_to(len).freeze()))
}
}
impl Encoder<Bytes> for LengthCodec {
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<()> {
let len = u32::try_from(item.len()).map_err(|err| Error::Encode(err.to_string()))?;
dst.put_u32(len);
dst.extend_from_slice(&item);
Ok(())
}
}
}
仓库 benchmark 中的 LengthCodec 使用更复用的方式:内部组合 LengthFieldBasedFrameDecoder 和 LengthFieldPrepender。
Add A UDP Codec
实现 DatagramDecoder 和 DatagramEncoder<T>。每次 decode 输入就是一个 datagram payload:
#![allow(unused)]
fn main() {
use bytes::{Bytes, BytesMut};
use rs_netty::{codec::{DatagramDecoder, DatagramEncoder}, Result};
struct RawDatagram;
impl DatagramDecoder for RawDatagram {
type Item = Bytes;
fn decode_datagram(&mut self, src: &[u8]) -> Result<Self::Item> {
Ok(Bytes::copy_from_slice(src))
}
}
impl DatagramEncoder<Bytes> for RawDatagram {
fn encode_datagram(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<()> {
dst.extend_from_slice(&item);
Ok(())
}
}
}
这和内置 BytesDatagramCodec 非常接近。
Add An Inbound Handler
实现 Inbound<I>,返回 Flow<Out>:
#![allow(unused)]
fn main() {
struct ParseRequest;
impl rs_netty::Inbound<String> for ParseRequest {
type Out = Request;
async fn read(
&mut self,
_ctx: &mut rs_netty::InboundContext,
msg: String,
) -> rs_netty::Result<rs_netty::Flow<Self::Out>> {
Ok(rs_netty::Flow::Next(Request { body: msg }))
}
}
}
如果想过滤消息而不是继续传递,返回 Flow::Stop。
Add An Outbound Handler
实现 Outbound<I>,把应用响应类型转换为下游类型:
#![allow(unused)]
fn main() {
struct RenderResponse;
impl rs_netty::Outbound<Response> for RenderResponse {
type Out = String;
async fn write(
&mut self,
_ctx: &mut rs_netty::OutboundContext,
msg: Response,
) -> rs_netty::Result<rs_netty::Flow<Self::Out>> {
Ok(rs_netty::Flow::Next(msg.body))
}
}
}
在 pipeline 中放到 final handler 后面:
#![allow(unused)]
fn main() {
pipeline()
.codec(LineCodec::new())
.inbound(ParseRequest)
.handler(Router)
.outbound(RenderResponse);
}
Add A Complete Example
建议步骤:
- 在
examples/下新增一个小而完整的.rs文件。 - 在根
Cargo.toml增加[[example]]条目,按需设置required-features。 - example 中优先使用现有 codec 和公开 traits,避免依赖
pub(crate)runtime。 - 如果是 TCP,使用
pipeline();如果是 UDP,使用datagram_pipeline()。 - 如果 handler 简单,使用
#[handler];如果要手动 flush、多次写或关闭连接,手写 impl。 - 加一个 trybuild pass 用例,确保 public API 编译形状不回退。
- 运行
cargo test和相关 feature test。
完整 TCP typed chain 可以参考 examples/tcp_typed_chain.rs;完整 UDP typed chain 可以参考 examples/udp_typed_chain.rs。
API Map
本章按模块列出重要 public API。pub(crate) runtime 细节不列为用户 API。
crate root
pipeline():创建 TCP stream typed pipeline builder。datagram_pipeline():创建 UDP datagram typed pipeline builder。TcpServer/TcpClient:TCP server/client builder。UdpServer/UdpClient:UDP server/client builder。Channel/DatagramChannel:cloneable external write/flush/close handle。Context/DatagramContext:final handler 中的写入和连接/socket 操作 context。InboundContext/BusinessContext/OutboundContext:transform stage 的只读 identity context。启用tlsfeature 时,TCP stream context 可通过tls()暴露协商后的 TLS metadata。Life/NoLife/CloseReason:lifecycle hook API。Error/Result:框架错误类型和结果别名。Flow:stage 继续或停止当前消息的结果。handler:macrosfeature 下 re-export 的 attribute macro。TlsContextBuilder/ServerTlsContext/ClientTlsContext/TlsInfo:tlsfeature 下的 TLS context 和 metadata API。
traits
Flow<T>:Next(T)继续 pipeline,Stop消费消息。Inbound<I>:入站转换 stage,输出Flow<Out>。Business<I>:业务转换 stage,位于 inbound 和 final handler 之间。Handler<I>:TCP final inbound handler,定义type Write。DatagramHandler<I>:UDP final inbound handler,定义type Write。Outbound<I>:出站转换 stage,把 handler write 类型转成 codec 可编码类型。
codec
Decoder:TCP byte stream decoder,返回Option<Item>。Encoder<I>:TCP byte stream encoder。DatagramDecoder:UDP datagram decoder。DatagramEncoder<I>:UDP datagram encoder。LineCodec:UTF-8 newline-delimited stream codec。ByteArrayDecoder:把当前 stream buffer drain 成Bytes,并可 encodeBytes。ByteArrayEncoder:pass-throughBytesencoder。FixedLengthFrameDecoder:固定长度 binary frame codec。DelimiterBasedFrameDecoder:delimiter-terminated binary frame codec。LengthFieldBasedFrameDecoder:length-field frame decoder,也可 encode zero-offset/zero-adjustmentBytes。LengthFieldPrepender:只负责 prepend length field 的Bytesencoder。ByteOrder:length-field codec 使用的 endian 设置。Utf8DatagramCodec:UTF-8 datagram codec。BytesDatagramCodec:raw bytes datagram codec。JsonDecode<T>:jsonfeature 下的 inbound JSON stage。JsonEncode<T>:jsonfeature 下的 outbound JSON stage。HttpCodec:最小 HTTP/1.1 request/response server codec。HttpRequest:HTTP request view,提供 method/target/version/header/body/trailers 查询。HttpResponse:HTTP response builder,支持 status/reason/header/body。MqttCodec:MQTT 5 packet stream codec。MqttPacket:MQTT control packet enum。QoS:MQTT QoS enum。ConnectPacket/ConnAckPacket/PublishPacket/AckPacket:常用 MQTT packet struct。SubscribePacket/SubAckPacket/UnsubscribePacket/UnsubAckPacket:MQTT subscribe/unsubscribe packet struct。DisconnectPacket/AuthPacket/Will/MqttProperty:MQTT 辅助 packet/property 类型。WebSocketCodec:websocketfeature 下的 server-side WebSocket codec。HttpWsCodec:websocketfeature 下的 HTTP + WebSocket shared-port codec。WebSocketInbound/WebSocketOutbound/WebSocketMessage:WebSocket message enums。HttpWsInbound/HttpWsOutbound:shared HTTP/WebSocket codec 的 message enums。WebSocketHandshake/WebSocketHandshakeResponse/WebSocketClose:WebSocket handshake 和 close 类型。HttpService/WebSocketService:HttpWsRouter使用的 static service traits。HttpWsRouter<H, W>:把 HTTP service 和 WebSocket service 组合成一个Handler<HttpWsInbound>。
context
ConnInfo:TCP connection id、peer addr、local addr;启用tlsfeature 时也包含协商后的 TLS metadata。DatagramInfo:UDP socket id、当前 peer addr、local addr;启用tlsfeature 时,TCP stream transform context 可携带 TCP-derived TLS metadata。ConnectionStats:TCP connection counter snapshot handle。Context<W>:TCP handler context,支持write、flush、write_and_flush、close、channel、stats;启用tlsfeature 时支持tls。DatagramContext<W>:UDP handler context,支持 current-peer 和 explicit-peer write/flush/close。InboundContext:inbound stage identity context;TLS TCP connection 中包含tls。BusinessContext:business stage identity context;TLS TCP connection 中包含tls。OutboundContext:outbound stage identity context;TLS TCP connection 中包含tls。
channel
Channel<W>:TCP external handle,支持 identity、queue capacity、stats、write/flush/write_and_flush/close。DatagramChannel<W>:UDP external handle,支持 socket identity、queue capacity、write_to/flush/write_to_and_flush/close。
life
CloseReason:TCP connection 关闭原因。NoLife:默认 no-op lifecycle hooks。Life:server、connection 和 UDP socket lifecycle hook trait。
tls
TlsContextBuilder::for_server():创建 server TLS context builder。TlsContextBuilder::for_client():创建NoTrust状态的 typestate client TLS context builder。ServerTlsContextBuilder:接收 certificate chain、private key、required 或 optional client-auth roots、ALPN protocols、SNI-specific identities,构建ServerTlsContext。ClientTlsContextBuilder<NoTrust>:可设置 server name 和 trust strategy,但没有build。ClientTlsContextBuilder<HasTrust>:选择 roots 或 verifier 后构建ClientTlsContext,可通过client_identity_pem/client_identity_der配置 mTLS client identity,也可发布 ALPN protocols。TlsInfo:TCPContext::tls、stream stage context 和ConnInfo::tls可访问的 TLS metadata,包含 peer certificates、selected ALPN,以及连接使用的 client/server name。client_auth_required_pem/client_auth_required_der:要求 client certificate 由 trusted roots 签发。client_auth_optional_pem/client_auth_optional_der:允许 client 不提供 certificate;如果提供 certificate,则仍会验证。alpn_protocols:配置 client 或 server context 发布的 ALPN protocols。协议名不能为空,且最长 255 bytes。sni_certificate_pem/sni_certificate_der:添加 SNI-specific server certificate identities。也可以通过certificate_chain_*加private_key_*配置 default fallback certificate。native_roots、webpki_roots、danger_accept_invalid_certs分别由tls-native-roots、tls-webpki-roots、tls-dangerousfeature 控制。
pipeline::stream
builder::pipeline():TCP builder 起点。builder::PipelineBuilder<...>:携带阶段状态和消息类型的 TCP builder。builder::IntoStreamPipeline:把 ready builder 转成 runtime stream pipeline。builder::IntoPipeline:stream pipeline 兼容转换 trait。runtime::StreamPipeline<...>:TCP runtime pipeline 类型,通常不直接命名。runtime::Pipeline<...>:StreamPipeline的兼容 type alias。runtime::StreamRuntimePipeline:TCP transport 运行 typed pipeline 所需的 public bound。runtime::RuntimePipeline:StreamRuntimePipeline的兼容 alias trait。
pipeline::datagram
builder::datagram_pipeline():UDP builder 起点。builder::DatagramPipelineBuilder<...>:携带阶段状态和消息类型的 UDP builder。builder::IntoDatagramPipeline:把 ready builder 转成 runtime datagram pipeline。runtime::DatagramPipeline<...>:UDP runtime pipeline 类型,通常不直接命名。runtime::DatagramRuntimePipeline:UDP transport 运行 typed pipeline 所需的 public bound。
pipeline::core
Identity:空 stage pipe,实现原样通过。Then<A, B>:静态组合两个 stage pipe。PipeStep<T, F>:runtime 优化用的 ready/future stage step。InboundPipe<I>/BusinessPipe<I>/OutboundPipe<I>:内部 stage 链处理 trait,但因 builder/runtime bounds 公开。Start/InboundPhase/BusinessPhase/Ready:builder state marker。
transport::tcp
TcpConnectionConfig:TCP 连接配置。TcpServerConfig/ServerConfig:TcpConnectionConfigtype alias。TcpClientConfig:TcpConnectionConfigtype alias。TcpServer<F, L>:TCP server builder。TcpServer::tls:tlsfeature 下启用 server-side TLS。TcpServerHandle:server shutdown/wait handle。TcpClient<F, L>:TCP client builder。TcpClient::tls:tlsfeature 下启用 client-side TLS。TcpClientHandle<W>:active TCP client handle。PipelineFactory<F>:client reusable pipeline factory wrapper。PipelineInstance<B>:client single-use pipeline wrapper。
transport::udp
UdpSocketConfig:UDP socket 配置。UdpServerConfig/UdpClientConfig:UdpSocketConfigtype alias。UdpServer<F, L>:UDP server socket builder。UdpServerHandle:UDP server shutdown/wait handle。UdpClient<F, L>:UDP client socket builder。UdpClientHandle<W>:active UDP client handle。
client And server
client::TcpClient/client::TcpClientHandle/client::TcpClientConfig:client 模块下的 TCP re-export。client::UdpClient/client::UdpClientHandle/client::UdpClientConfig:client 模块下的 UDP re-export。server::TcpServer/server::TcpServerHandle/server::TcpServerConfig/server::ServerConfig:server 模块下的 TCP re-export。server::UdpServer/server::UdpServerHandle/server::UdpServerConfig:server 模块下的 UDP re-export。