Codecs
codec 位于 pipeline 的边界。stream codec 用于 TCP byte stream;datagram codec 用于 UDP datagram。
Stream Codec Traits
#![allow(unused)]
fn main() {
pub trait Decoder: Send + 'static {
type Item: Send + 'static;
fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>>;
}
pub trait Encoder<I>: Send + 'static {
fn encode(&mut self, item: I, dst: &mut bytes::BytesMut) -> Result<()>;
}
}
decode 只在完整 frame 可用时消费 bytes 并返回 Some(item)。需要更多 bytes 时返回 Ok(None)。
Datagram Codec Traits
#![allow(unused)]
fn main() {
pub trait DatagramDecoder: Send + 'static {
type Item: Send + 'static;
fn decode_datagram(&mut self, src: &[u8]) -> Result<Self::Item>;
}
pub trait DatagramEncoder<I>: Send + 'static {
fn encode_datagram(&mut self, item: I, dst: &mut bytes::BytesMut) -> Result<()>;
}
}
UDP decoder 每次收到的是一个完整 datagram payload。
Built-In Stream Codecs
LineCodec:UTF-8 line codec,decode 时去掉\n和可选\r,encode 时追加\n。ByteArrayDecoder:把当前 buffer 中全部 bytes 作为一个Bytes输出;同时可 encodeBytes。ByteArrayEncoder:只实现Encoder<Bytes>。FixedLengthFrameDecoder:固定长度 frame codec,encode 时要求长度精确匹配。DelimiterBasedFrameDecoder:按一个或多个 delimiter 切分二进制 frame,可控制是否保留 delimiter。LengthFieldBasedFrameDecoder:Netty-shaped length-field frame decoder,支持 1/2/3/4/8 字节长度字段、offset、adjustment、strip 和 byte order。作为 encoder 使用时只支持 offset 0 和 adjustment 0。LengthFieldPrepender:只负责给Bytes追加长度字段,常用于自定义组合 codec。HttpCodec:最小 HTTP/1.1 server codec,decode request,encode response。支持Content-Length,可选 chunked request body。MqttCodec:MQTT 5 packet codec,处理 fixed header、Remaining Length、properties 和支持的控制包;不维护 broker/client session state。WebSocketCodec:websocketfeature 下可用,server-side WebSocket codec,从 HTTP Upgrade handshake state 切到 frame state。HttpWsCodec:websocketfeature 下可用,在同一 TCP port 上先处理 HTTP request,再对 WebSocket upgrade 切到 frame state。
Built-In Datagram Codecs
Utf8DatagramCodec:每个 datagram 是一个 UTF-8String。BytesDatagramCodec:每个 datagram 是一个 rawbytes::Bytes。
JSON Is A Pipeline Stage
JsonDecode<T> 和 JsonEncode<T> 在 json feature 下可用。它们不是 framing codec,而是 typed pipeline stage:
#![allow(unused)]
fn main() {
let pipeline = pipeline()
.codec(LineCodec::new())
.inbound(JsonDecode::<Request>::new())
.handler(ApiHandler)
.outbound(JsonEncode::<Response>::new());
}
这样 framing 和 JSON parsing/serialization 分离。LineCodec 负责消息边界,JsonDecode<T> 负责 String/Bytes 到 T,JsonEncode<T> 负责 T 到 compact JSON String。
Codec Position
codec 总是 builder 的第一个必需 stage。它决定 inbound 链的初始类型,也决定 outbound 链最终必须产出的类型。
例如:
#![allow(unused)]
fn main() {
pipeline()
.codec(LineCodec::new()) // Decoder<Item = String>, Encoder<String>
.inbound(ParseRequest) // String -> Request
.handler(Router) // Request -> writes Response
.outbound(RenderResponse); // Response -> String
}
如果省略 RenderResponse,LineCodec 无法 encode Response,编译失败。