TCP
TCP supports both server and client modes through stream pipelines.
Server
TcpServer::bind(addr) creates a builder. You must call .pipeline(...) to set a per-connection pipeline factory, then use either .run().await or .start().await.
#![allow(unused)]
fn main() {
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};
struct Echo;
#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
Ok(msg)
}
let server = TcpServer::bind("127.0.0.1:0")
.pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
.start()
.await?;
server.shutdown();
server.wait().await?;
Ok::<(), rs_netty::Error>(())
}
start returns TcpServerHandle, which exposes local_addr(), shutdown(), and wait().await.
Client
TcpClient::connect(addr) offers two pipeline setup methods:
.pipeline(|| ...): a reusable factory for stateless or cheaply cloneable state..pipeline_instance(...): consumes one already-built pipeline, useful when a handler owns one-shot state such asoneshot::Sender.
This client snippet is close to examples/tcp_json_line_echo.rs:
#![allow(unused)]
fn main() {
let (tx, rx) = tokio::sync::oneshot::channel();
let client = TcpClient::connect("127.0.0.1:9003")
.pipeline_instance(
pipeline()
.codec(LineCodec::new())
.inbound(JsonDecode::<Response>::new())
.handler(PrintResponse { response_tx: Some(tx) })
.outbound(JsonEncode::<Request>::new()),
)
.run()
.await?;
client.write_and_flush(Request { message: "hello json".to_string() }).await?;
let _ = rx.await;
client.close().await?;
client.wait().await?;
Ok::<(), rs_netty::Error>(())
}
TcpClientHandle<W> exposes channel(), write, flush, write_and_flush, close, and wait.
Configuration
TCP servers and clients share TcpConnectionConfig, defined in src/transport/tcp/config.rs. Defaults:
read_buffer_capacity: 8 KiB.write_buffer_capacity: 8 KiB.max_frame_size: 1 MiB.outbound_queue_size: 1024.tcp_nodelay:true.idle_timeout:None.track_connection_stats:false.
Builder methods include:
read_buffer_capacity(value)write_buffer_capacity(value)max_frame_size(value)outbound_queue_size(value)tcp_nodelay(value)idle_timeout(duration)track_connection_stats()
Clients also provide bind(local_addr) to choose the local address.
Runtime Details
The TCP runtime uses BytesMut for read and write buffers. After each socket read, it repeatedly calls codec.decode until the codec returns Ok(None). If the read buffer exceeds max_frame_size, the connection is closed with FrameTooLarge.
External Channel commands enter the runtime through a bounded queue. write only encodes into the write buffer; flush performs write_all; write_and_flush encodes and immediately flushes.
After track_connection_stats() is enabled, Context::stats() and Channel::stats() expose connection time, bytes read/written, and frames read/written. frames_written counts frames encoded into the write buffer; it does not guarantee those frames have already been flushed to the socket.