Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

TCP

TCP supports both server and client modes through stream pipelines.

Server

TcpServer::bind(addr) creates a builder. You must call .pipeline(...) to set a per-connection pipeline factory, then use either .run().await or .start().await.

#![allow(unused)]
fn main() {
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};

struct Echo;

#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
    Ok(msg)
}

let server = TcpServer::bind("127.0.0.1:0")
    .pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
    .start()
    .await?;

server.shutdown();
server.wait().await?;
Ok::<(), rs_netty::Error>(())
}

start returns TcpServerHandle, which exposes local_addr(), shutdown(), and wait().await.

Client

TcpClient::connect(addr) offers two pipeline setup methods:

  • .pipeline(|| ...): a reusable factory for stateless or cheaply cloneable state.
  • .pipeline_instance(...): consumes one already-built pipeline, useful when a handler owns one-shot state such as oneshot::Sender.

This client snippet is close to examples/tcp_json_line_echo.rs:

#![allow(unused)]
fn main() {
let (tx, rx) = tokio::sync::oneshot::channel();

let client = TcpClient::connect("127.0.0.1:9003")
    .pipeline_instance(
        pipeline()
            .codec(LineCodec::new())
            .inbound(JsonDecode::<Response>::new())
            .handler(PrintResponse { response_tx: Some(tx) })
            .outbound(JsonEncode::<Request>::new()),
    )
    .run()
    .await?;

client.write_and_flush(Request { message: "hello json".to_string() }).await?;
let _ = rx.await;
client.close().await?;
client.wait().await?;
Ok::<(), rs_netty::Error>(())
}

TcpClientHandle<W> exposes channel(), write, flush, write_and_flush, close, and wait.

Configuration

TCP servers and clients share TcpConnectionConfig, defined in src/transport/tcp/config.rs. Defaults:

  • read_buffer_capacity: 8 KiB.
  • write_buffer_capacity: 8 KiB.
  • max_frame_size: 1 MiB.
  • outbound_queue_size: 1024.
  • tcp_nodelay: true.
  • idle_timeout: None.
  • track_connection_stats: false.

Builder methods include:

  • read_buffer_capacity(value)
  • write_buffer_capacity(value)
  • max_frame_size(value)
  • outbound_queue_size(value)
  • tcp_nodelay(value)
  • idle_timeout(duration)
  • track_connection_stats()

Clients also provide bind(local_addr) to choose the local address.

Runtime Details

The TCP runtime uses BytesMut for read and write buffers. After each socket read, it repeatedly calls codec.decode until the codec returns Ok(None). If the read buffer exceeds max_frame_size, the connection is closed with FrameTooLarge.

External Channel commands enter the runtime through a bounded queue. write only encodes into the write buffer; flush performs write_all; write_and_flush encodes and immediately flushes.

After track_connection_stats() is enabled, Context::stats() and Channel::stats() expose connection time, bytes read/written, and frames read/written. frames_written counts frames encoded into the write buffer; it does not guarantee those frames have already been flushed to the socket.