TCP
TCP 支持 server 和 client,使用 stream pipeline。
Server
TcpServer::bind(addr) 创建 builder。必须调用 .pipeline(...) 设置 per-connection pipeline factory,之后可以 .run().await 或 .start().await。
#![allow(unused)]
fn main() {
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};
struct Echo;
#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
Ok(msg)
}
let server = TcpServer::bind("127.0.0.1:0")
.pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
.start()
.await?;
server.shutdown();
server.wait().await?;
Ok::<(), rs_netty::Error>(())
}
start 返回 TcpServerHandle,可以读取 local_addr()、调用 shutdown(),再 wait().await 等待 server task 和连接 task 收尾。
Client
TcpClient::connect(addr) 有两种 pipeline 设置方式:
.pipeline(|| ...):可复用 factory,适合无状态或可 clone 状态。.pipeline_instance(...):消费一个已构建 pipeline,适合 handler 拥有oneshot::Sender这类一次性状态。
贴近 examples/tcp_json_line_echo.rs 的 client 片段:
#![allow(unused)]
fn main() {
let (tx, rx) = tokio::sync::oneshot::channel();
let client = TcpClient::connect("127.0.0.1:9003")
.pipeline_instance(
pipeline()
.codec(LineCodec::new())
.inbound(JsonDecode::<Response>::new())
.handler(PrintResponse { response_tx: Some(tx) })
.outbound(JsonEncode::<Request>::new()),
)
.run()
.await?;
client.write_and_flush(Request { message: "hello json".to_string() }).await?;
let _ = rx.await;
client.close().await?;
client.wait().await?;
Ok::<(), rs_netty::Error>(())
}
TcpClientHandle<W> 提供 channel()、write、flush、write_and_flush、close 和 wait。
Configuration
TCP server 和 client 共享 TcpConnectionConfig,默认值在 src/transport/tcp/config.rs:
read_buffer_capacity: 8 KiB。write_buffer_capacity: 8 KiB。max_frame_size: 1 MiB。outbound_queue_size: 1024。tcp_nodelay:true。idle_timeout:None。track_connection_stats:false。
builder 方法包括:
read_buffer_capacity(value)write_buffer_capacity(value)max_frame_size(value)outbound_queue_size(value)tcp_nodelay(value)idle_timeout(duration)track_connection_stats()
client 额外有 bind(local_addr),用于指定本地地址。
Runtime Details
TCP runtime 用 BytesMut 做 read/write buffer。每次 socket read 后会循环调用 codec decode,直到返回 Ok(None)。如果 read buffer 超过 max_frame_size,连接以 FrameTooLarge 关闭。
外部 Channel 命令通过 bounded queue 进入 runtime。write 只 encode 到 write buffer;flush 才执行 write_all;write_and_flush encode 后立即 flush。
启用 track_connection_stats() 后,Context::stats() 和 Channel::stats() 可以看到连接时间、bytes read/written、frames read/written。frames_written 统计的是 encode 到 write buffer 的 frame,不保证已经 flush 到 socket。