Learn Zig Series (#65) - Pipes and Inter-Process Communication
[IMAGE: https://images.hive.blog/DQmaHuB6qTWHaSpJHQ1S8FCCRmNQuUxTcPZdU4yKHsJ7vEP/zig-banner.png]
What will I learn
- How anonymous pipes work for connecting parent and child process I/O;
- How named pipes (FIFOs) let unrelated processes communicate;
- How pipe buffering and blocking behavior affect your programs;
- How to multiplex reads from multiple pipes using poll;
- How to build a process pipeline that chains commands like a shell;
- How to pass structured data through pipes with length-prefixed messages;
- How pipe capacity limits work and how to handle flow control.
Requirements
- A working modern computer running macOS, Windows or Ubuntu;
- An installed Zig 0.14+ distribution (download from ziglang.org);
- The ambition to learn Zig programming.
Difficulty
- Intermediate
Curriculum (of the Learn Zig Series):
- Zig Programming Tutorial - ep001 - Intro
- Learn Zig Series (#2) - Hello Zig, Variables and Types
- Learn Zig Series (#3) - Functions and Control Flow
- Learn Zig Series (#4) - Error Handling (Zig's Best Feature)
- Learn Zig Series (#5) - Arrays, Slices, and Strings
- Learn Zig Series (#6) - Structs, Enums, and Tagged Unions
- Learn Zig Series (#7) - Memory Management and Allocators
- Learn Zig Series (#8) - Pointers and Memory Layout
- Learn Zig Series (#9) - Comptime (Zig's Superpower)
- Learn Zig Series (#10) - Project Structure, Modules, and File I/O
- Learn Zig Series (#11) - Mini Project: Building a Step Sequencer
- Learn Zig Series (#12) - Testing and Test-Driven Development
- Learn Zig Series (#13) - Interfaces via Type Erasure
- Learn Zig Series (#14) - Generics with Comptime Parameters
- Learn Zig Series (#15) - The Build System (build.zig)
- Learn Zig Series (#16) - Sentinel-Terminated Types and C Strings
- Learn Zig Series (#17) - Packed Structs and Bit Manipulation
- Learn Zig Series (#18) - Async Concepts and Event Loops
- Learn Zig Series (#18b) - Addendum: Async Returns in Zig 0.16
- Learn Zig Series (#19) - SIMD with @Vector
- Learn Zig Series (#20) - Working with JSON
- Learn Zig Series (#21) - Networking and TCP Sockets
- Learn Zig Series (#22) - Hash Maps and Data Structures
- Learn Zig Series (#23) - Iterators and Lazy Evaluation
- Learn Zig Series (#24) - Logging, Formatting, and Debug Output
- Learn Zig Series (#25) - Mini Project: HTTP Status Checker
- Learn Zig Series (#26) - Writing a Custom Allocator
- Learn Zig Series (#27) - C Interop: Calling C from Zig
- Learn Zig Series (#28) - C Interop: Exposing Zig to C
- Learn Zig Series (#29) - Inline Assembly and Low-Level Control
- Learn Zig Series (#30) - Thread Safety and Atomics
- Learn Zig Series (#31) - Memory-Mapped I/O and Files
- Learn Zig Series (#32) - Compile-Time Reflection with @typeInfo
- Learn Zig Series (#33) - Building a State Machine with Tagged Unions
- Learn Zig Series (#34) - Performance Profiling and Optimization
- Learn Zig Series (#35) - Cross-Compilation and Target Triples
- Learn Zig Series (#36) - Mini Project: CLI Task Runner
- Learn Zig Series (#37) - Markdown to HTML: Tokenizer and Lexer
- Learn Zig Series (#38) - Markdown to HTML: Parser and AST
- Learn Zig Series (#39) - Markdown to HTML: Renderer and CLI
- Learn Zig Series (#40) - Key-Value Store: In-Memory Store
- Learn Zig Series (#41) - Key-Value Store: Write-Ahead Log
- Learn Zig Series (#42) - Key-Value Store: TCP Server
- Learn Zig Series (#43) - Key-Value Store: Client Library and Benchmarks
- Learn Zig Series (#44) - Image Tool: Reading and Writing PPM/BMP
- Learn Zig Series (#45) - Image Tool: Pixel Operations
- Learn Zig Series (#46) - Image Tool: CLI Pipeline
- Learn Zig Series (#47) - Build a Shell: Parsing Commands
- Learn Zig Series (#48) - Build a Shell: Process Spawning
- Learn Zig Series (#49) - Build a Shell: Built-in Commands
- Learn Zig Series (#50) - Build a Shell: Job Control and Signals
- Learn Zig Series (#51) - HTTP Server: Accept Loop and Parsing
- Learn Zig Series (#52) - HTTP Server: Router and Responses
- Learn Zig Series (#53) - HTTP Server: Static Files and MIME
- Learn Zig Series (#54) - HTTP Server: Middleware and Logging
- Learn Zig Series (#55) - ECS Game Engine: Architecture
- Learn Zig Series (#56) - ECS Game Engine: Component Storage
- Learn Zig Series (#57) - ECS Game Engine: Systems and Queries
- Learn Zig Series (#58) - ECS Game Engine: Terminal Rendering
- Learn Zig Series (#59) - Assembler: Instruction Encoding
- Learn Zig Series (#60) - Assembler: Two-Pass Assembly
- Learn Zig Series (#61) - Assembler: Disassembler and Binary Inspector
- Learn Zig Series (#62) - File Systems: Reading Directories and Metadata
- Learn Zig Series (#63) - File Watching: Detecting Changes
- Learn Zig Series (#64) - Process Management: Fork, Exec, Wait
- Learn Zig Series (#65) - Pipes and Inter-Process Communication (this post)
Learn Zig Series (#65) - Pipes and Inter-Process Communication
Solutions to Episode 64 Exercises
Exercise 1: Health check for the Supervisor
const std = @import("std");
const SupervisedProcess = struct {
name: []const u8,
command: []const []const u8,
health_check: ?[]const []const u8,
pid: ?std.posix.pid_t,
restart_count: u32,
max_restarts: u32,
last_exit_code: ?u8,
last_start_time: i64,
last_health_check: i64,
fn isRunning(self: *const SupervisedProcess) bool {
return self.pid != null;
}
};
const HealthCheckSupervisor = struct {
allocator: std.mem.Allocator,
processes: std.ArrayList(SupervisedProcess),
health_interval_ms: i64,
fn init(allocator: std.mem.Allocator) HealthCheckSupervisor {
return .{
.allocator = allocator,
.processes = std.ArrayList(SupervisedProcess).init(allocator),
.health_interval_ms = 5000,
};
}
fn deinit(self: *HealthCheckSupervisor) void {
self.processes.deinit();
}
fn addProcess(
self: *HealthCheckSupervisor,
name: []const u8,
command: []const []const u8,
health_check: ?[]const []const u8,
max_restarts: u32,
) !void {
try self.processes.append(.{
.name = name,
.command = command,
.health_check = health_check,
.pid = null,
.restart_count = 0,
.max_restarts = max_restarts,
.last_exit_code = null,
.last_start_time = 0,
.last_health_check = 0,
});
}
fn startProcess(self: *HealthCheckSupervisor, proc: *SupervisedProcess) !void {
_ = self;
var child = std.process.Child.init(proc.command, std.heap.page_allocator);
child.stdout_behavior = .Inherit;
child.stderr_behavior = .Inherit;
try child.spawn();
proc.pid = child.id;
proc.last_start_time = std.time.milliTimestamp();
proc.last_health_check = std.time.milliTimestamp();
}
fn runHealthChecks(self: *HealthCheckSupervisor) !void {
const stdout = std.io.getStdOut().writer();
const now = std.time.milliTimestamp();
for (self.processes.items) |*proc| {
if (!proc.isRunning()) continue;
const hc = proc.health_check orelse continue;
if (now - proc.last_health_check < self.health_interval_ms) continue;
proc.last_health_check = now;
var checker = std.process.Child.init(hc, self.allocator);
checker.stdout_behavior = .Ignore;
checker.stderr_behavior = .Ignore;
try checker.spawn();
const result = try checker.wait();
if (result.Exited != 0) {
try stdout.print("[health] '{s}' failed check, restarting\n", .{proc.name});
std.posix.kill(proc.pid.?, std.posix.SIG.KILL) catch {};
_ = std.posix.waitpid(proc.pid.?, 0);
proc.pid = null;
proc.restart_count += 1;
if (proc.restart_count <= proc.max_restarts) {
try self.startProcess(proc);
}
}
}
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var sv = HealthCheckSupervisor.init(allocator);
defer sv.deinit();
// the health check verifies /tmp/healthy exists
try sv.addProcess(
"worker",
&.{ "sh", "-c", "touch /tmp/healthy && sleep 60" },
&.{ "sh", "-c", "test -f /tmp/healthy" },
3,
);
try sv.startProcess(&sv.processes.items[0]);
// run a few health check cycles
var ticks: usize = 0;
while (ticks < 3) : (ticks += 1) {
std.time.sleep(2 * std.time.ns_per_s);
try sv.runHealthChecks();
}
}
The health check runs as a separate child process. If it exits non-zero, the supervisor kills the main process and restarts it. The health_interval_ms field prevents running checks too often.
Exercise 2: Process pool maintaining N workers
const std = @import("std");
const WorkerPool = struct {
allocator: std.mem.Allocator,
target_count: usize,
command: []const []const u8,
workers: std.ArrayList(Worker),
const Worker = struct {
id: usize,
pid: std.posix.pid_t,
};
fn init(allocator: std.mem.Allocator, count: usize, cmd: []const []const u8) WorkerPool {
return .{
.allocator = allocator,
.target_count = count,
.command = cmd,
.workers = std.ArrayList(Worker).init(allocator),
};
}
fn deinit(self: *WorkerPool) void {
self.workers.deinit();
}
fn spawnWorker(self: *WorkerPool, id: usize) !void {
var env_map = std.process.EnvMap.init(self.allocator);
defer env_map.deinit();
var id_buf: [20]u8 = undefined;
const id_str = std.fmt.bufPrint(&id_buf, "{d}", .{id}) catch "0";
try env_map.put("WORKER_ID", id_str);
try env_map.put("PATH", std.posix.getenv("PATH") orelse "/usr/bin");
var child = std.process.Child.init(self.command, self.allocator);
child.stdout_behavior = .Inherit;
child.stderr_behavior = .Inherit;
child.env_map = &env_map;
try child.spawn();
try self.workers.append(.{ .id = id, .pid = child.id });
}
fn fillPool(self: *WorkerPool) !void {
var next_id: usize = 0;
for (self.workers.items) |w| {
if (w.id >= next_id) next_id = w.id + 1;
}
while (self.workers.items.len < self.target_count) {
try self.spawnWorker(next_id);
next_id += 1;
}
}
fn reapAndRefill(self: *WorkerPool) !usize {
var reaped: usize = 0;
while (true) {
const result = std.posix.waitpid(-1, std.posix.W.NOHANG);
if (result.pid <= 0) break;
// remove from workers list
var i: usize = 0;
while (i < self.workers.items.len) {
if (self.workers.items[i].pid == result.pid) {
_ = self.workers.orderedRemove(i);
reaped += 1;
break;
}
i += 1;
}
}
if (reaped > 0) try self.fillPool();
return reaped;
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var pool = WorkerPool.init(allocator, 3, &.{
"sh", "-c", "echo worker $WORKER_ID started && sleep 10",
});
defer pool.deinit();
try pool.fillPool();
const stdout = std.io.getStdOut().writer();
try stdout.print("Pool started with {d} workers\n", .{pool.workers.items.len});
var ticks: usize = 0;
while (ticks < 20) : (ticks += 1) {
std.time.sleep(500 * std.time.ns_per_ms);
const n = try pool.reapAndRefill();
if (n > 0) try stdout.print(" reaped {d}, pool now {d}\n", .{ n, pool.workers.items.len });
}
}
Each worker gets a unique WORKER_ID in its environment. When reapAndRefill detects a dead worker via non-blocking waitpid, it removes it from the list and calls fillPool to spawn a replacement.
Exercise 3: Pipe executor connecting two processes
const std = @import("std");
fn pipeCommands(
allocator: std.mem.Allocator,
cmd1: []const []const u8,
cmd2: []const []const u8,
) ![]u8 {
// create the pipe connecting cmd1 stdout to cmd2 stdin
const pipe_fds = try std.posix.pipe();
// spawn first command with stdout -> pipe write end
var child1 = std.process.Child.init(cmd1, allocator);
child1.stdout_behavior = .{ .fd = pipe_fds[1] };
child1.stderr_behavior = .Inherit;
try child1.spawn();
// parent closes the write end -- only child1 has it
std.posix.close(pipe_fds[1]);
// spawn second command with stdin -> pipe read end
var child2 = std.process.Child.init(cmd2, allocator);
child2.stdin_behavior = .{ .fd = pipe_fds[0] };
child2.stdout_behavior = .Pipe;
child2.stderr_behavior = .Inherit;
try child2.spawn();
// parent closes the read end -- only child2 has it
std.posix.close(pipe_fds[0]);
// read child2's output
const reader = child2.stdout.?.reader();
const output = try reader.readAllAlloc(allocator, 64 * 1024);
_ = try child1.wait();
_ = try child2.wait();
return output;
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const result = try pipeCommands(
allocator,
&.{ "echo", "hello world" },
&.{ "wc", "-w" },
);
defer allocator.free(result);
const stdout = std.io.getStdOut().writer();
try stdout.print("pipe result: '{s}'\n", .{std.mem.trim(u8, result, " \n")});
}
The key insight is that you create the pipe BEFORE forking either child, then assign the write end to child1's stdout and the read end to child2's stdin using the .fd behavior. The parent must close both pipe ends after spawning -- otherwise the read end stays open in the parent and child2 never sees EOF.
Last episode we went deep on the Unix process model -- fork, exec, wait, process groups, zombies, the double-fork daemon trick, and a process supervisor. All of that covered how to CREATE and MANAGE processes. But processes by themselves are islands. They exit with a code and that's about it. Today we connect the islands. Pipes are the oldest Unix IPC (inter-process communication) mechanism, and they're still one of the most useful. Every time you type ls | grep foo | wc -l in a terminal, you're using pipes. Every time a parent process reads its child's output, there's a pipe underneath.
We actually already saw pipes in action last episode when we used std.process.Child with .Pipe for stdout/stderr. Today we're going underneath that abstraction to understand how pipes really work, and then we'll build some practical things with them.
Here we go!
Anonymous pipes: connecting parent and child
A pipe is a kernel buffer with two file descriptors: one for writing, one for reading. Data written to the write end can be read from the read end. It's a unidirectional byte stream -- no seeking, no random access, just bytes flowing in one direction. The pipe syscall creates a pair of file descriptors, and typically the parent keeps one end and the child (after fork) gets the other.
Let's start from the ground up with std.posix.pipe:
const std = @import("std");
pub fn main() !void {
const stdout = std.io.getStdOut().writer();
// pipe() returns [2]fd: [0] = read end, [1] = write end
const pipe_fds = try std.posix.pipe();
const pid = try std.posix.fork();
if (pid == 0) {
// child: close the read end, write to the write end
std.posix.close(pipe_fds[0]);
const msg = "hello from the child process!\n";
const writer_fd: std.posix.fd_t = pipe_fds[1];
_ = std.posix.write(writer_fd, msg) catch {};
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
// parent: close the write end, read from the read end
std.posix.close(pipe_fds[1]);
var buf: [256]u8 = undefined;
const n = try std.posix.read(pipe_fds[0], &buf);
std.posix.close(pipe_fds[0]);
try stdout.print("Parent received {d} bytes: {s}", .{ n, buf[0..n] });
_ = std.posix.waitpid(pid, 0);
}
The critical part is which end each process closes. The child closes the read end (it only writes), and the parent closes the write end (it only reads). If you forget to close the write end in the parent, the read call will never return EOF -- the kernel thinks someone might still write to the pipe because a write descriptor is still open. This is one of the most common pipe bugs and it's a real pain to debug because the program just hangs.
After fork, both parent and child have copies of both file descriptors. That's four file descriptors total pointing at the same pipe. You need to close the ones you don't use, otherwise the reference count on the pipe never drops to zero on the write side, and the reader blocks forver.
Named pipes (FIFOs): communicating between unrelated processes
Anonymous pipes only work between related processes (parent-child), because you need fork to share the file descriptors. Named pipes (FIFOs) solve this -- they're filesystem entries that any process can open. One process opens the FIFO for writing, another opens it for reading, and data flows between them:
const std = @import("std");
const linux = std.os.linux;
fn createFifo(path: [*:0]const u8) !void {
// mkfifo creates a special FIFO file
const result = linux.mknodat(
linux.AT.FDCWD,
path,
linux.S.IFIFO | 0o666,
0,
);
const err = std.posix.errno(result);
if (err != .SUCCESS and err != .EXIST) {
return error.MkfifoFailed;
}
}
fn writerProcess(path: []const u8) !void {
const file = try std.fs.cwd().openFile(path, .{ .mode = .write_only });
defer file.close();
const messages = [_][]const u8{
"message one\n",
"message two\n",
"message three\n",
};
for (messages) |msg| {
try file.writeAll(msg);
std.time.sleep(200 * std.time.ns_per_ms);
}
}
fn readerProcess(path: []const u8) !void {
const stdout = std.io.getStdOut().writer();
const file = try std.fs.cwd().openFile(path, .{ .mode = .read_only });
defer file.close();
var buf: [256]u8 = undefined;
while (true) {
const n = file.read(&buf) catch break;
if (n == 0) break; // writer closed
try stdout.print("[reader] got: {s}", .{buf[0..n]});
}
try stdout.print("[reader] writer closed, done\n", .{});
}
pub fn main() !void {
const fifo_path = "/tmp/zig_fifo_demo";
const fifo_path_z: [*:0]const u8 = "/tmp/zig_fifo_demo";
try createFifo(fifo_path_z);
defer std.fs.cwd().deleteFile(fifo_path) catch {};
const pid = try std.posix.fork();
if (pid == 0) {
// child writes
writerProcess(fifo_path) catch {};
std.process.exit(0);
}
// parent reads
readerProcess(fifo_path) catch |err| {
const stdout = std.io.getStdOut().writer();
stdout.print("reader error: {}\n", .{err}) catch {};
};
_ = std.posix.waitpid(pid, 0);
}
The FIFO lives in the filesystem as a special file. ls -l shows it with a p prefix (for pipe). Any number of processes can open it -- one side writes, the other reads. The kernel handles the buffering. When all writers close the FIFO, readers get EOF. When all readers close, writers get SIGPIPE (or EPIPE if the signal is blocked).
Named pipes are useful for simple inter-process communication where you don't want the overhead of sockets or shared memory. Log aggregation, command-and-control channels between daemons, and communication between scripts written in different languages -- FIFOs handle all of these. The downside compared to Unix domain sockets is that FIFOs are unidirectional. For bidirectional communication you need two FIFOs (or switch to sockets, which we'll cover in a future episode).
Pipe buffering and blocking behavior
Pipes have a kernel buffer. On Linux this is typically 64 KiB (16 pages of 4 KiB each). When the buffer is full, writes block. When the buffer is empty, reads block. This backpressure mechanism is what makes pipes self-regulating -- a fast producer automatically slows down when the consumer can't keep up:
const std = @import("std");
const linux = std.os.linux;
pub fn main() !void {
const stdout_w = std.io.getStdOut().writer();
const pipe_fds = try std.posix.pipe();
// check the pipe buffer size using fcntl F_GETPIPE_SZ
const pipe_size = linux.fcntl(pipe_fds[0], linux.F.GETPIPE_SZ, @as(u64, 0));
try stdout_w.print("Pipe buffer size: {d} bytes ({d} KiB)\n", .{
pipe_size,
@divFloor(pipe_size, 1024),
});
const pid = try std.posix.fork();
if (pid == 0) {
// child: write more data than the pipe can hold
std.posix.close(pipe_fds[0]);
const chunk: [4096]u8 = [_]u8{'A'} ** 4096;
var total: usize = 0;
var block_count: usize = 0;
while (total < 256 * 1024) { // try to write 256 KiB
const n = std.posix.write(pipe_fds[1], &chunk) catch |err| {
const stderr = std.io.getStdErr().writer();
stderr.print("write error after {d} bytes: {}\n", .{ total, err }) catch {};
break;
};
total += n;
block_count += 1;
}
const stderr = std.io.getStdErr().writer();
stderr.print("[writer] wrote {d} bytes in {d} calls\n", .{ total, block_count }) catch {};
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
// parent: read slowly to demonstrate blocking on the writer side
std.posix.close(pipe_fds[1]);
var total_read: usize = 0;
var read_buf: [1024]u8 = undefined;
while (true) {
// slow reader -- sleep between reads
std.time.sleep(10 * std.time.ns_per_ms);
const n = std.posix.read(pipe_fds[0], &read_buf) catch break;
if (n == 0) break;
total_read += n;
}
std.posix.close(pipe_fds[0]);
_ = std.posix.waitpid(pid, 0);
try stdout_w.print("[reader] total read: {d} bytes\n", .{total_read});
}
When you run this, the writer fills the 64 KiB buffer quickly, then blocks on the next write until the reader consumes some data. The write call doesn't return until space is available. This is blocking I/O in action -- and it's the default behavior for pipes.
You can make pipe file descriptors non-blocking by passing O_NONBLOCK to pipe2 (or setting it with fcntl after creation). With non-blocking I/O, a write to a full pipe returns error.WouldBlock instead of blocking, and a read from an empty pipe does the same. This is usful when you need to multiplex I/O across multiple pipes, which is exactly what we'll do next.
One more thing about pipe sizes: on Linux you can increase the buffer with fcntl(fd, F_SETPIPE_SZ, new_size). The maximum is controlled by /proc/sys/fs/pipe-max-size (usually 1 MiB). Bigger buffers reduce context switches between producer and consumer but use more kernel memory. For most use cases the default 64 KiB is fine.
Multiplexing pipes with poll
When you have multiple child processes, each with its own pipe, you need to read from all of them without blocking on any single one. The poll syscall solves this -- you give it a list of file descriptors and it tells you which ones have data available:
const std = @import("std");
const linux = std.os.linux;
const PipeMultiplexer = struct {
allocator: std.mem.Allocator,
pipes: std.ArrayList(PipeEntry),
const PipeEntry = struct {
name: []const u8,
read_fd: std.posix.fd_t,
child_pid: std.posix.pid_t,
closed: bool,
};
fn init(allocator: std.mem.Allocator) PipeMultiplexer {
return .{
.allocator = allocator,
.pipes = std.ArrayList(PipeEntry).init(allocator),
};
}
fn deinit(self: *PipeMultiplexer) void {
for (self.pipes.items) |entry| {
if (!entry.closed) std.posix.close(entry.read_fd);
}
self.pipes.deinit();
}
fn addChild(self: *PipeMultiplexer, name: []const u8, cmd: []const []const u8) !void {
const pipe_fds = try std.posix.pipe();
const pid = try std.posix.fork();
if (pid == 0) {
// child: redirect stdout to pipe write end
std.posix.close(pipe_fds[0]);
std.posix.dup2(pipe_fds[1], 1) catch std.process.exit(1);
std.posix.close(pipe_fds[1]);
// exec the command
const argv = @as(
[*:null]const ?[*:0]const u8,
@ptrCast(cmd.ptr),
);
const err = std.posix.execvpeZ(
@ptrCast(cmd[0]),
argv,
@ptrCast(std.c.environ),
);
_ = err;
std.process.exit(127);
}
// parent: close write end, keep read end
std.posix.close(pipe_fds[1]);
try self.pipes.append(.{
.name = name,
.read_fd = pipe_fds[0],
.child_pid = pid,
.closed = false,
});
}
fn readAll(self: *PipeMultiplexer) !void {
const stdout = std.io.getStdOut().writer();
var buf: [512]u8 = undefined;
while (true) {
// count how many are still open
var open_count: usize = 0;
for (self.pipes.items) |e| {
if (!e.closed) open_count += 1;
}
if (open_count == 0) break;
// build poll fd array
var pollfds = try self.allocator.alloc(linux.pollfd, self.pipes.items.len);
defer self.allocator.free(pollfds);
for (self.pipes.items, 0..) |entry, i| {
pollfds[i] = .{
.fd = if (entry.closed) -1 else entry.read_fd,
.events = linux.POLL.IN,
.revents = 0,
};
}
// poll with 1 second timeout
const ready = linux.poll(pollfds.ptr, @intCast(pollfds.len), 1000);
if (@as(isize, @bitCast(@as(usize, ready))) <= 0) continue;
for (self.pipes.items, 0..) |*entry, i| {
if (entry.closed) continue;
if (pollfds[i].revents & linux.POLL.IN != 0) {
const n = std.posix.read(entry.read_fd, &buf) catch 0;
if (n == 0) {
// EOF
std.posix.close(entry.read_fd);
entry.closed = true;
_ = std.posix.waitpid(entry.child_pid, 0);
} else {
try stdout.print("[{s}] {s}", .{ entry.name, buf[0..n] });
}
}
if (pollfds[i].revents & linux.POLL.HUP != 0 and
pollfds[i].revents & linux.POLL.IN == 0)
{
std.posix.close(entry.read_fd);
entry.closed = true;
_ = std.posix.waitpid(entry.child_pid, 0);
}
}
}
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var mux = PipeMultiplexer.init(allocator);
defer mux.deinit();
// Note: addChild uses fork+exec internaly
// For this demo, we use std.process.Child instead
// to avoid the raw execvpe complexity. Shown here
// is the poll logic -- the real value of this example.
// For a working version, let's use pipe() + Child:
const stdout = std.io.getStdOut().writer();
const pipe1 = try std.posix.pipe();
const pipe2 = try std.posix.pipe();
const pid1 = try std.posix.fork();
if (pid1 == 0) {
std.posix.close(pipe1[0]);
std.posix.close(pipe2[0]);
std.posix.close(pipe2[1]);
_ = std.posix.write(pipe1[1], "output from child A\n") catch {};
std.time.sleep(100 * std.time.ns_per_ms);
_ = std.posix.write(pipe1[1], "more from child A\n") catch {};
std.posix.close(pipe1[1]);
std.process.exit(0);
}
const pid2 = try std.posix.fork();
if (pid2 == 0) {
std.posix.close(pipe1[0]);
std.posix.close(pipe1[1]);
std.posix.close(pipe2[0]);
std.time.sleep(50 * std.time.ns_per_ms);
_ = std.posix.write(pipe2[1], "output from child B\n") catch {};
std.posix.close(pipe2[1]);
std.process.exit(0);
}
// parent closes write ends
std.posix.close(pipe1[1]);
std.posix.close(pipe2[1]);
var pollfds = [_]linux.pollfd{
.{ .fd = pipe1[0], .events = linux.POLL.IN, .revents = 0 },
.{ .fd = pipe2[0], .events = linux.POLL.IN, .revents = 0 },
};
const names = [_][]const u8{ "childA", "childB" };
var open_count: usize = 2;
var buf: [256]u8 = undefined;
while (open_count > 0) {
const ready = linux.poll(&pollfds, 2, 2000);
if (@as(isize, @bitCast(@as(usize, ready))) <= 0) continue;
for (&pollfds, 0..) |*pfd, i| {
if (pfd.fd < 0) continue;
if (pfd.revents & linux.POLL.IN != 0) {
const n = std.posix.read(pfd.fd, &buf) catch 0;
if (n == 0) {
std.posix.close(pfd.fd);
pfd.fd = -1;
open_count -= 1;
} else {
try stdout.print("[{s}] {s}", .{ names[i], buf[0..n] });
}
}
if (pfd.revents & linux.POLL.HUP != 0 and pfd.revents & linux.POLL.IN == 0) {
std.posix.close(pfd.fd);
pfd.fd = -1;
open_count -= 1;
}
}
}
_ = std.posix.waitpid(pid1, 0);
_ = std.posix.waitpid(pid2, 0);
try stdout.print("All children done\n", .{});
_ = mux;
}
poll takes an array of pollfd structs. Each one has an fd (file descriptor), events (what you're interested in -- POLLIN for "data available to read"), and revents (what actually happened -- filled in by the kernel). Setting fd to -1 tells poll to skip that entry.
The POLLHUP flag (hangup) fires when the write end of the pipe is closed. You might get POLLIN and POLLHUP at the same time if there's still data in the buffer when the writer closes. Always check for POLLIN first and read any remaining data before treating POLLHUP as end-of-stream.
This pattern -- poll across multiple file descriptors, handle whichever ones are ready -- is the foundation of event-driven programming. Web servers, databases, message brokers -- they all use some variant of poll/epoll/kqueue to handle thousands of connections concurrently on a single thread. We used epoll in the HTTP server episodes (51-54), and the concept is identical here with pipes.
Building a pipeline: chaining processes like a shell
A shell pipeline like cat /etc/passwd | grep root | wc -l connects three processes with two pipes. Each process's stdout becomes the next process's stdin. Let's build a general pipeline executor:
const std = @import("std");
const Pipeline = struct {
allocator: std.mem.Allocator,
stages: std.ArrayList([]const []const u8),
fn init(allocator: std.mem.Allocator) Pipeline {
return .{
.allocator = allocator,
.stages = std.ArrayList([]const []const u8).init(allocator),
};
}
fn deinit(self: *Pipeline) void {
self.stages.deinit();
}
fn addStage(self: *Pipeline, cmd: []const []const u8) !void {
try self.stages.append(cmd);
}
fn execute(self: *Pipeline) ![]u8 {
if (self.stages.items.len == 0) return error.EmptyPipeline;
var child_pids = std.ArrayList(std.posix.pid_t).init(self.allocator);
defer child_pids.deinit();
// we need N-1 pipes for N stages
var prev_read_fd: ?std.posix.fd_t = null;
for (self.stages.items, 0..) |cmd, i| {
const is_last = (i == self.stages.items.len - 1);
// create a pipe for this stage's output (unless it's the last)
var out_pipe: ?[2]std.posix.fd_t = null;
if (!is_last) {
out_pipe = try std.posix.pipe();
}
// also pipe the last stage's stdout so we can capture it
var capture_pipe: ?[2]std.posix.fd_t = null;
if (is_last) {
capture_pipe = try std.posix.pipe();
}
var child = std.process.Child.init(cmd, self.allocator);
if (prev_read_fd) |fd| {
child.stdin_behavior = .{ .fd = fd };
}
if (out_pipe) |p| {
child.stdout_behavior = .{ .fd = p[1] };
} else if (capture_pipe) |p| {
child.stdout_behavior = .{ .fd = p[1] };
}
child.stderr_behavior = .Inherit;
try child.spawn();
try child_pids.append(child.id);
// close pipe ends in parent
if (prev_read_fd) |fd| std.posix.close(fd);
if (out_pipe) |p| {
std.posix.close(p[1]); // close write end
prev_read_fd = p[0]; // keep read end for next stage
}
if (capture_pipe) |p| {
std.posix.close(p[1]); // close write end
prev_read_fd = p[0]; // capture read end
}
}
// read the final output
var output = std.ArrayList(u8).init(self.allocator);
if (prev_read_fd) |fd| {
var buf: [4096]u8 = undefined;
while (true) {
const n = std.posix.read(fd, &buf) catch break;
if (n == 0) break;
try output.appendSlice(buf[0..n]);
}
std.posix.close(fd);
}
// wait for all children
for (child_pids.items) |pid| {
_ = std.posix.waitpid(pid, 0);
}
return try output.toOwnedSlice();
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const stdout = std.io.getStdOut().writer();
// pipeline: cat /etc/passwd | grep root | wc -l
var pipeline = Pipeline.init(allocator);
defer pipeline.deinit();
try pipeline.addStage(&.{ "cat", "/etc/passwd" });
try pipeline.addStage(&.{ "grep", "root" });
try pipeline.addStage(&.{ "wc", "-l" });
const result = try pipeline.execute();
defer allocator.free(result);
try stdout.print("Pipeline output: {s}", .{result});
// another pipeline: ls /usr/bin | sort | head -5
var pipeline2 = Pipeline.init(allocator);
defer pipeline2.deinit();
try pipeline2.addStage(&.{ "ls", "/usr/bin" });
try pipeline2.addStage(&.{ "sort" });
try pipeline2.addStage(&.{ "head", "-5" });
const result2 = try pipeline2.execute();
defer allocator.free(result2);
try stdout.print("Pipeline 2 output:\n{s}", .{result2});
}
The algorithm is: for N stages, create N-1 pipes. Each stage's stdin is the previous pipe's read end, and each stage's stdout is the next pipe's write end. The parent closes both ends of each pipe after spawning the children -- this is critical, because if the parent holds a write end open, the downstream reader will never see EOF.
Notice how we capture the final stage's output by creating one extra pipe for the last stage. The parent reads from it after all children are spawned. This gives us the same result as running the pipeline in a shell and reading stdout.
This is basically what our shell project did in episodes 47-50, but stripped down to the essentials. The shell version handled more edge cases (built-in commands, environment variables, job control), but the pipe plumbing is identical.
Passing structured data through pipes: length-prefixed messages
Raw bytes are fine for text streams, but when you need to send structured messages between processes, you need a framing protocol. The simplest approach is length-prefixed messages: send a 4-byte length header followed by that many bytes of payload:
const std = @import("std");
const Message = struct {
msg_type: u8,
payload: []const u8,
fn encode(self: Message, writer: anytype) !void {
// write length header (4 bytes big-endian)
const total_len: u32 = @intCast(1 + self.payload.len);
const len_bytes = std.mem.toBytes(std.mem.nativeToBig(u32, total_len));
try writer.writeAll(&len_bytes);
// write type byte
try writer.writeByte(self.msg_type);
// write payload
try writer.writeAll(self.payload);
}
fn decode(allocator: std.mem.Allocator, reader: anytype) !?Message {
// read length header
var len_bytes: [4]u8 = undefined;
const n = reader.readAll(&len_bytes) catch return null;
if (n < 4) return null;
const total_len = std.mem.bigToNative(u32, std.mem.bytesToValue(u32, &len_bytes));
if (total_len == 0) return null;
if (total_len > 1024 * 1024) return error.MessageTooLarge;
// read type byte
const msg_type = reader.readByte() catch return null;
// read payload
const payload_len = total_len - 1;
const payload = try allocator.alloc(u8, payload_len);
const read_count = reader.readAll(payload) catch |err| {
allocator.free(payload);
return err;
};
if (read_count < payload_len) {
allocator.free(payload);
return null;
}
return .{
.msg_type = msg_type,
.payload = payload,
};
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const stdout = std.io.getStdOut().writer();
const pipe_fds = try std.posix.pipe();
const pid = try std.posix.fork();
if (pid == 0) {
// child: send structured messages
std.posix.close(pipe_fds[0]);
const file = std.fs.File{ .handle = pipe_fds[1] };
const writer = file.writer();
const messages = [_]Message{
.{ .msg_type = 1, .payload = "system initialized" },
.{ .msg_type = 2, .payload = "processing batch 42" },
.{ .msg_type = 3, .payload = "task complete: 128 items" },
.{ .msg_type = 1, .payload = "shutting down" },
};
for (messages) |msg| {
msg.encode(writer) catch break;
}
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
// parent: decode structured messages
std.posix.close(pipe_fds[1]);
const file = std.fs.File{ .handle = pipe_fds[0] };
const reader = file.reader();
const type_names = [_][]const u8{ "???", "INFO", "PROGRESS", "RESULT" };
while (true) {
const msg = Message.decode(allocator, reader) catch break;
if (msg == null) break;
const m = msg.?;
defer allocator.free(m.payload);
const type_name = if (m.msg_type < type_names.len)
type_names[m.msg_type]
else
"UNKNOWN";
try stdout.print("[{s}] {s}\n", .{ type_name, m.payload });
}
std.posix.close(pipe_fds[0]);
_ = std.posix.waitpid(pid, 0);
}
Length-prefixed framing is used everywhere: HTTP/2 frames, WebSocket frames, protobuf wire format, database protocols (PostgreSQL's wire protocol sends a 4-byte length before every message). The big advantage over delimiter-based framing (like newline-separated JSON) is that the payload can contain any bytes, including newlines and null bytes. You know exactly how many bytes to read, so you never accidentally split a message or merge two together.
The big-endian byte order for the length header is a convention from network protocols (it's sometimes called "network byte order"). It means the most significant byte comes first. Zig's std.mem.nativeToBig handles the conversion regardless of the host's endianness.
Pipe capacity and flow control
We touched on pipe buffer sizes earlier, but let's look at this more systematically. Understanding pipe capacity matters when you're designing systems with multiple stages that produce and consume at different rates:
const std = @import("std");
const linux = std.os.linux;
fn measurePipeCapacity() !void {
const stdout = std.io.getStdOut().writer();
const pipe_fds = try std.posix.pipe();
defer std.posix.close(pipe_fds[0]);
defer std.posix.close(pipe_fds[1]);
// get current pipe size
const current = linux.fcntl(pipe_fds[1], linux.F.GETPIPE_SZ, @as(u64, 0));
try stdout.print("Default pipe size: {d} bytes ({d} KiB)\n", .{
current, @divFloor(current, 1024),
});
// try to increase it
const requested: usize = 256 * 1024; // 256 KiB
const result = linux.fcntl(
pipe_fds[1],
linux.F.SETPIPE_SZ,
@as(u64, requested),
);
if (@as(isize, @bitCast(@as(usize, result))) > 0) {
try stdout.print("Resized pipe to: {d} bytes ({d} KiB)\n", .{
result, @divFloor(result, 1024),
});
}
// measure actual capacity by writing until we'd block
// set pipe to non-blocking first
_ = linux.fcntl(pipe_fds[1], linux.F.SETFL, @as(u64, linux.O.NONBLOCK));
const chunk: [4096]u8 = [_]u8{'X'} ** 4096;
var total: usize = 0;
while (true) {
const n = std.posix.write(pipe_fds[1], &chunk) catch break;
total += n;
}
try stdout.print("Actual capacity before EAGAIN: {d} bytes ({d} KiB)\n", .{
total, @divFloor(total, 1024),
});
}
fn demonstrateAtomicWrites() !void {
const stdout = std.io.getStdOut().writer();
// PIPE_BUF is the max size for atomic writes (4096 on Linux)
// writes <= PIPE_BUF are guaranteed atomic (no interleaving)
// writes > PIPE_BUF may be split across multiple reads
const pipe_fds = try std.posix.pipe();
const pid1 = try std.posix.fork();
if (pid1 == 0) {
std.posix.close(pipe_fds[0]);
// write small messages (under PIPE_BUF) -- these are atomic
var i: usize = 0;
while (i < 10) : (i += 1) {
var msg: [64]u8 = undefined;
const len = std.fmt.bufPrint(&msg, "writer-A message {d}\n", .{i}) catch break;
_ = std.posix.write(pipe_fds[1], msg[0..len.len]) catch break;
}
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
const pid2 = try std.posix.fork();
if (pid2 == 0) {
std.posix.close(pipe_fds[0]);
var i: usize = 0;
while (i < 10) : (i += 1) {
var msg: [64]u8 = undefined;
const len = std.fmt.bufPrint(&msg, "writer-B message {d}\n", .{i}) catch break;
_ = std.posix.write(pipe_fds[1], msg[0..len.len]) catch break;
}
std.posix.close(pipe_fds[1]);
std.process.exit(0);
}
std.posix.close(pipe_fds[1]);
// read and display -- messages may be interleaved but never torn
var buf: [4096]u8 = undefined;
while (true) {
const n = std.posix.read(pipe_fds[0], &buf) catch break;
if (n == 0) break;
try stdout.print("{s}", .{buf[0..n]});
}
std.posix.close(pipe_fds[0]);
_ = std.posix.waitpid(pid1, 0);
_ = std.posix.waitpid(pid2, 0);
}
pub fn main() !void {
try measurePipeCapacity();
try stdout_print_sep();
try demonstrateAtomicWrites();
}
fn stdout_print_sep() !void {
const stdout = std.io.getStdOut().writer();
try stdout.print("---\n", .{});
}
The PIPE_BUF constant (4096 bytes on Linux) is important for concurrent writers. POSIX guarantees that writes of PIPE_BUF bytes or fewer are atomic -- they won't be interleaved with writes from other processes. Writes larger than PIPE_BUF have no atomicity guarantee; the kernel may split them, causing data from different writers to get mixed together. If you have multiple processes writing to the same pipe (e.g., multiple workers logging to a shared pipe), keep individual write calls under 4096 bytes and you're safe.
Exercises
-
Build a "tee" command in Zig. Read stdin, write every byte to BOTH stdout AND a file specified as a command-line argument, simultaneously. Use
std.posix.pipeand fork a child process that handles the file writing, while the parent handles stdout. Test it withecho "hello tee" | ./your_tee /tmp/tee_output.txtand verify the file contains the same output as the terminal. -
Write a bi-directional IPC system using two anonymous pipes. The parent sends a request message (length-prefixed, like we built in this episode) to the child, and the child sends a response back. Implement a simple "echo server" protocol: the child reads a message, converts it to uppercase, and sends it back. Test with 5 different messages and verify each response is the uppercased version.
-
Build a parallel command executor that runs N commands simultaneously, captures all their stdout via pipes, and prints the output in order (command 1's full output first, then command 2's, etc.) even though the commands finish in arbitrary order. Use poll to read from all pipes concurrently (so no pipe buffer fills up and blocks a child), but buffer the output per-command and print in sequence after all commands finish. Test with commands that produce different amounts of output and finish at different times.
Wat we geleerd hebben
- Anonymous pipes are kernel buffers with a read end and a write end -- created with
pipe(), shared between parent and child viafork, and you MUST close the unused ends or the reader never sees EOF - Named pipes (FIFOs) are filesystem entries that allow unrelated processes to communicate --
mkfifocreates them, and any process can open them for reading or writing - Pipe buffers are 64 KiB by default on Linux (configurable with
fcntl), and writes block when the buffer is full -- this backpressure is a feature, not a bug, preventing fast producers from overwhelming slow consumers polllets you monitor multiple pipe file descriptors simultaneously -- you get notified which ones have data, which avoids blocking on any single pipe while others have data waiting- Shell pipelines are just N processes connected by N-1 anonymous pipes -- each stage's stdout becomes the next stage's stdin, and the parent must close all pipe ends after spawning
- Length-prefixed message framing solves the problem of sending structured data through byte streams -- a 4-byte header tells the reader exactly how many bytes to expect, no delimiter scanning needed
- Writes of
PIPE_BUF(4096) bytes or fewer are guaranteed atomic by POSIX -- multiple concurrent writers won't have their data interleaved if they keep messages under this limit
Pipes are the glue of Unix. They've been around since 1973 and they're not going anywhere. Understanding how they work at the syscall level -- how buffers fill and drain, how file descriptor management controls EOF, how poll multiplexes I/O -- gives you the building blocks for much more complex IPC. The shared memory and semaphore mechanisms we'll look at next build on this same foundation of kernel-managed buffers and file descriptors, but with different tradeoffs.
Bedankt en tot de volgende keer!
@scipio