Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

TCP

TCP 支持 server 和 client,使用 stream pipeline。

Server

TcpServer::bind(addr) 创建 builder。必须调用 .pipeline(...) 设置 per-connection pipeline factory,之后可以 .run().await.start().await

#![allow(unused)]
fn main() {
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};

struct Echo;

#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
    Ok(msg)
}

let server = TcpServer::bind("127.0.0.1:0")
    .pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
    .start()
    .await?;

server.shutdown();
server.wait().await?;
Ok::<(), rs_netty::Error>(())
}

start 返回 TcpServerHandle,可以读取 local_addr()、调用 shutdown(),再 wait().await 等待 server task 和连接 task 收尾。

Client

TcpClient::connect(addr) 有两种 pipeline 设置方式:

  • .pipeline(|| ...):可复用 factory,适合无状态或可 clone 状态。
  • .pipeline_instance(...):消费一个已构建 pipeline,适合 handler 拥有 oneshot::Sender 这类一次性状态。

贴近 examples/tcp_json_line_echo.rs 的 client 片段:

#![allow(unused)]
fn main() {
let (tx, rx) = tokio::sync::oneshot::channel();

let client = TcpClient::connect("127.0.0.1:9003")
    .pipeline_instance(
        pipeline()
            .codec(LineCodec::new())
            .inbound(JsonDecode::<Response>::new())
            .handler(PrintResponse { response_tx: Some(tx) })
            .outbound(JsonEncode::<Request>::new()),
    )
    .run()
    .await?;

client.write_and_flush(Request { message: "hello json".to_string() }).await?;
let _ = rx.await;
client.close().await?;
client.wait().await?;
Ok::<(), rs_netty::Error>(())
}

TcpClientHandle<W> 提供 channel()writeflushwrite_and_flushclosewait

Configuration

TCP server 和 client 共享 TcpConnectionConfig,默认值在 src/transport/tcp/config.rs

  • read_buffer_capacity: 8 KiB。
  • write_buffer_capacity: 8 KiB。
  • max_frame_size: 1 MiB。
  • outbound_queue_size: 1024。
  • tcp_nodelay: true
  • idle_timeout: None
  • track_connection_stats: false

builder 方法包括:

  • read_buffer_capacity(value)
  • write_buffer_capacity(value)
  • max_frame_size(value)
  • outbound_queue_size(value)
  • tcp_nodelay(value)
  • idle_timeout(duration)
  • track_connection_stats()

client 额外有 bind(local_addr),用于指定本地地址。

Runtime Details

TCP runtime 用 BytesMut 做 read/write buffer。每次 socket read 后会循环调用 codec decode,直到返回 Ok(None)。如果 read buffer 超过 max_frame_size,连接以 FrameTooLarge 关闭。

外部 Channel 命令通过 bounded queue 进入 runtime。write 只 encode 到 write buffer;flush 才执行 write_allwrite_and_flush encode 后立即 flush。

启用 track_connection_stats() 后,Context::stats()Channel::stats() 可以看到连接时间、bytes read/written、frames read/written。frames_written 统计的是 encode 到 write buffer 的 frame,不保证已经 flush 到 socket。