Named pipes let two processes on the same machine talk to each other through a shared byte stream. In this post I build a complete RPC system on top of them: a server that listens for JSON requests, dispatches them, and sends back responses, plus a client library with typed methods.
Everything is pure Rust using windows-sys for the Windows API bindings and serde for serialization.
You can find the full source code on GitHub: kx0101/pipe-rpc
Table of Contents
- Project Structure
- The Shared Crate: Types and Protocol
- The Server
- The Client
- How It All Flows
- Key Takeaways
Project Structure
The project is a Cargo workspace with three crates:
pipe-rpc/
├── Cargo.toml # Workspace root
├── shared/ # Shared types, protocol, and helpers
│ ├── Cargo.toml
│ └── src/lib.rs
├── server/ # Named pipe server
│ ├── Cargo.toml
│ └── src/main.rs
└── client/ # Named pipe client with typed RPC methods
├── Cargo.toml
└── src/main.rs
The workspace root Cargo.toml just ties them together:
[workspace]
members = ["shared", "server", "client"]
resolver = "2"
The Shared Crate: Types and Protocol
The shared crate is the foundation. It defines the RPC message types, the wire protocol, and helpers for working with Windows APIs.
RPC Message Types
First, we define what requests and responses look like:
use serde::{Deserialize, Serialize};
pub const PIPE_NAME: &str = r"\\.\pipe\rpc_demo";
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "method", content = "params")]
pub enum RpcRequest {
Factorial { n: u64 },
Reverse { text: String },
Echo { payload: serde_json::Value },
}
The #[serde(tag = "method", content = "params")] attribute controls how the enum serializes. A Factorial request becomes:
{"method": "Factorial", "params": {"n": 5}}
Self-describing and easy to debug. The response types follow the same pattern:
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "status", content = "params")]
pub enum RpcResponse {
Ok(RpcResult),
Err(String),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum RpcResult {
Number(u64),
Text(String),
Json(serde_json::Value),
}
Plus some convenience constructors to keep the server code clean:
impl RpcResponse {
pub fn ok_number(n: u64) -> Self {
RpcResponse::Ok(RpcResult::Number(n))
}
pub fn ok_text(s: impl Into<String>) -> Self {
RpcResponse::Ok(RpcResult::Text(s.into()))
}
pub fn ok_json(v: serde_json::Value) -> Self {
RpcResponse::Ok(RpcResult::Json(v))
}
pub fn err(msg: impl Into<String>) -> Self {
RpcResponse::Ok(RpcResult::Text(msg.into()))
}
}
The Framing Protocol
Named pipes (and TCP sockets, and any byte stream) have no concept of messages. If you write 100 bytes and then 50 bytes, the reader might get all 150 at once, or 30 then 120, or any other combination. The stream doesn't preserve boundaries.
So we need framing: a way to tell the reader where one message ends and the next begins. The simplest approach is length-prefixing. Before every JSON payload, we write a 4-byte header containing the payload's length.
┌────────────────┬──────────────────────────────────┐
│ 4 bytes (u32) │ N bytes │
│ Payload Length │ JSON Payload │
│ (little-endian)│ │
└────────────────┴──────────────────────────────────┘
The encoder:
pub fn encode(msg: &impl Serialize) -> std::io::Result<Vec<u8>> {
let json = serde_json::to_vec(msg)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let len = (json.len() as u32).to_le_bytes();
let mut buf = Vec::with_capacity(4 + json.len());
buf.extend_from_slice(&len);
buf.extend_from_slice(&json);
Ok(buf)
}
And the decoder:
pub fn decode_from<R: std::io::Read, T: for<'a> Deserialize<'a>>(
reader: &mut R,
) -> std::io::Result<T> {
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf)?;
let len = u32::from_le_bytes(len_buf) as usize;
let mut body = vec![0u8; len];
reader.read_exact(&mut body)?;
serde_json::from_slice(&body)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
A few things worth noting:
- We use
read_exactinstead ofread. A regularreadmight return fewer bytes than requested (partial reads are normal for streams).read_exactkeeps reading until the buffer is completely filled or an error occurs. - The
?operator propagates errors upward. If the other side disconnects mid-read,read_exactreturns anUnexpectedEoferror that the caller can handle cleanly. - The
for<'a> Deserialize<'a>bound is a Higher-Ranked Trait Bound (HRTB). It means "this type can be deserialized from data with any lifetime." We need this because ourbodybuffer is a local variable that gets dropped at the end of the function.
The to_wide Helper
Windows APIs have *A (ANSI) and *W (Unicode) variants. The W variants expect null-terminated UTF-16 strings. Rust strings are UTF-8 and not null-terminated, so we need a conversion:
pub fn to_wide(s: &str) -> Vec<u16> {
s.encode_utf16().chain(std::iter::once(0)).collect()
}
Three steps:
encode_utf16()converts UTF-8 to UTF-16 code units.chain(std::iter::once(0))appends a null terminator.collect()gathers everything into aVec<u16>
The Server
The server creates a named pipe, waits for clients, reads requests, dispatches them, and sends responses.
Creating the Named Pipe
Named pipes on Windows live under the \\.\pipe\ namespace. We create one with CreateNamedPipeW:
use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX;
use windows_sys::Win32::{
Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE},
System::Pipes::{
ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe,
PIPE_READMODE_BYTE, PIPE_TYPE_BYTE, PIPE_UNLIMITED_INSTANCES, PIPE_WAIT,
},
};
let pipe_name = to_wide(PIPE_NAME);
let pipe = unsafe {
CreateNamedPipeW(
pipe_name.as_ptr(),
PIPE_ACCESS_DUPLEX,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
4096,
4096,
0,
std::ptr::null(),
)
};
if pipe == INVALID_HANDLE_VALUE {
let err = std::io::Error::last_os_error();
eprintln!("[server] CreateNamedPipeW failed: {err}");
std::process::exit(1);
}
Breaking down the flags:
PIPE_ACCESS_DUPLEX: Both sides can read and writePIPE_TYPE_BYTE | PIPE_READMODE_BYTE: Data flows as a stream of bytes, not discrete messagesPIPE_WAIT: Blocking I/O. Reads wait until data arrivesPIPE_UNLIMITED_INSTANCES: Multiple pipe instances allowed4096, 4096: Input and output buffer sizes in bytes
The function returns a HANDLE, which is an opaque, pointer-sized value representing a kernel object. The OS gives you one when you create or open a resource (file, pipe, thread, mutex, etc.) and you pass it back to other API calls to operate on that resource. When you're done, you release it with CloseHandle.
Waiting for Clients
After creating the pipe, we block until a client connects:
println!("[server] waiting for client..");
let connected = unsafe { ConnectNamedPipe(pipe, std::ptr::null_mut()) };
let last_err = std::io::Error::last_os_error();
let ok = connected != 0
|| last_err.raw_os_error() == Some(535); // ERROR_PIPE_CONNECTED
if !ok {
eprintln!("[server] ConnectNamedPipe failed: {last_err}");
unsafe { CloseHandle(pipe) };
continue;
}
Error code 535 (ERROR_PIPE_CONNECTED) means a client connected between creating the pipe and calling ConnectNamedPipe. That's fine, we treat it as success.
Bridging Windows HANDLEs to Rust I/O
We have a raw Windows HANDLE, but we want to use Rust's Read and Write traits. The bridge is from_raw_handle, which wraps the handle in a std::fs::File:
fn serve(pipe: HANDLE) -> anyhow::Result<()> {
let file = unsafe { std::fs::File::from_raw_handle(pipe as *mut _) };
let mut reader = BufReader::new(file.try_clone()?);
let mut writer = file;
// ...
}
We call try_clone() to get two handles to the same underlying pipe, one for reading and one for writing. The BufReader wrapper adds buffering to reduce the number of system calls.
The Request Loop
The server sits in a loop reading and dispatching requests:
loop {
let req: RpcRequest = match decode_from(&mut reader) {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
println!("[server] client disconnected");
break;
}
Err(e) => {
eprintln!("[server] read error: {e}");
break;
}
};
println!("[server] received: {req:?}");
let resp = handle_request(req);
let frame = encode(&resp)?;
let _ = writer.write_all(&frame);
writer.flush()?;
unsafe { FlushFileBuffers(pipe) };
}
The Err(e) if e.kind() == UnexpectedEof guard catches clean disconnections. When the client closes its end of the pipe, read_exact returns UnexpectedEof.
After writing the response, we flush at two levels: writer.flush() pushes Rust's internal buffer out, and FlushFileBuffers tells the OS kernel to flush its buffer to the pipe.
Request Dispatch
The handler matches on the request variant. Rust's exhaustive pattern matching ensures every method is handled:
fn handle_request(req: RpcRequest) -> RpcResponse {
match req {
RpcRequest::Factorial { n } => {
let result = factorial(n);
match result {
Some(v) => RpcResponse::ok_number(v),
None => RpcResponse::err(format!("overflow computing {}!", n)),
}
}
RpcRequest::Reverse { text } => {
let reversed: String = text.chars().rev().collect();
RpcResponse::ok_text(reversed)
}
RpcRequest::Echo { payload } => RpcResponse::ok_json(payload),
}
}
fn factorial(n: u64) -> Option<u64> {
if n == 0 {
return Some(1);
}
let mut acc: u64 = 1;
for i in 1..=n {
acc = acc.checked_mul(i)?;
}
Some(acc)
}
*Minor detail but: The factorial uses checked_mul to safely detect overflow instead of panicking. A u64 can hold up to 20! (2,432,902,008,176,640,000) but 21! overflows.
The Server Main Loop
The outer loop creates a new pipe instance for each client session:
fn main() {
println!("[server] starting on {PIPE_NAME}");
let pipe_name = to_wide(shared::PIPE_NAME);
loop {
let pipe = unsafe {
CreateNamedPipeW(...)
};
// ... error checking, ConnectNamedPipe, etc...
println!("[server] client connected");
if let Err(e) = serve(pipe) {
eprintln!("[server] serve error: {e}");
}
unsafe {
DisconnectNamedPipe(pipe);
CloseHandle(pipe);
}
println!("[server] client session ended, listening again…\n");
}
}
After each session, DisconnectNamedPipe tears down the client connection and CloseHandle frees the kernel resources. Then we loop back and create a fresh pipe for the next client.
The Client
The client connects to the server's pipe, wraps the connection in a struct, and exposes typed RPC methods.
Connecting with Retries
pub struct RpcClient {
reader: BufReader<std::fs::File>,
writer: std::fs::File,
_handle: HANDLE,
}
impl RpcClient {
pub fn connect(retries: u32) -> std::io::Result<Self> {
let pipe_name = to_wide(shared::PIPE_NAME);
let handle = 'connect: {
for attempt in 0..=retries {
let h = unsafe {
CreateFileW(
pipe_name.as_ptr(),
GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_NONE,
std::ptr::null(),
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
std::ptr::null_mut(),
)
};
if h != INVALID_HANDLE_VALUE {
break 'connect h;
}
let e = std::io::Error::last_os_error();
if e.raw_os_error() != Some(231) || attempt == retries {
return Err(e);
}
println!("[client] pipe busy, waiting…");
unsafe { WaitNamedPipeW(pipe_name.as_ptr(), 2000) };
}
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"pipe busy after retries",
));
};
let file = unsafe { std::fs::File::from_raw_handle(handle as *mut _) };
let writer = file.try_clone()?;
let reader = BufReader::new(file);
Ok(Self {
reader,
writer,
_handle: handle,
})
}
}
The client uses CreateFileW with OPEN_EXISTING to connect to the pipe. On Windows, named pipes are opened the same way as regular files. If the pipe is busy (error 231 = ERROR_PIPE_BUSY), it calls WaitNamedPipeW to wait up to 2 seconds, then retries.
The 'connect: label with break 'connect h is a labeled block. It lets us break out of both the for loop and the surrounding block with a value
The Call Method
The core of the client is straightforward: encode, send, receive, decode.
fn call(&mut self, req: &RpcRequest) -> std::io::Result<RpcResponse> {
let frame = encode(req)?;
self.writer.write_all(&frame)?;
let _ = self.writer.flush();
decode_from(&mut self.reader)
}
Typed RPC Methods
Each RPC method wraps call and extracts the expected result type:
pub fn factorial(&mut self, n: u64) -> anyhow::Result<u64> {
match self.call(&RpcRequest::Factorial { n })? {
RpcResponse::Ok(RpcResult::Number(v)) => Ok(v),
RpcResponse::Err(e) => Err(anyhow::anyhow!("server error: {e}")),
other => Err(anyhow::anyhow!("unexpected response: {other:?}")),
}
}
pub fn reverse(&mut self, text: impl Into<String>) -> anyhow::Result<String> {
match self.call(&RpcRequest::Reverse { text: text.into() })? {
RpcResponse::Ok(RpcResult::Text(s)) => Ok(s),
RpcResponse::Err(e) => Err(anyhow::anyhow!("server error: {e}")),
other => Err(anyhow::anyhow!("unexpected response: {other:?}")),
}
}
pub fn echo(&mut self, payload: serde_json::Value) -> anyhow::Result<serde_json::Value> {
match self.call(&RpcRequest::Echo { payload })? {
RpcResponse::Ok(RpcResult::Json(v)) => Ok(v),
RpcResponse::Err(e) => Err(anyhow::anyhow!("server error: {e}")),
other => Err(anyhow::anyhow!("unexpected response: {other:?}")),
}
}
The caller gets a clean API: client.factorial(5) returns Result<u64>, no manual serialization needed.
Putting It Together
fn main() -> anyhow::Result<()> {
println!("[client] connecting to {PIPE_NAME}");
let mut client = RpcClient::connect(5)?;
println!("[client] connected!\n");
for n in [0, 1, 5, 10, 20] {
let result = client.factorial(n)?;
println!("factorial({n}) = {result}");
}
println!();
for s in ["RPA is cool", "i love johnP", "jesus"] {
let result = client.reverse(s)?;
println!("reverse({s:?}) = {result:?}");
}
println!();
let payload = serde_json::json!({
"service": "runtime",
"action": "test",
"values": [1, 2, 3]
});
let echoed = client.echo(payload.clone())?;
println!("echo({payload}) =\n {echoed}");
println!("\n[client] all calls completed successfully");
Ok(())
}
How It All Flows
Here's the full lifecycle of a single RPC call:

Key Takeaways
-
Named pipes are byte streams. They're local-only, identified by name instead of port, and they don't preserve message boundaries. You need a framing protocol (we used length-prefixing) to separate one message from the next.
-
Windows APIs expect UTF-16 with null terminators. Rust strings are UTF-8 without null terminators, so you need a small conversion helper (
to_wide) whenever you call a*Wfunction. -
HANDLEis Windows' universal resource token. It's an opaque pointer you get fromCreate*/Open*calls and pass back to other APIs. Converting it tostd::fs::Fileviafrom_raw_handlelets you use Rust's standardRead/Writetraits on top of it.