___|  _ \   |  |    |   |_ _|\ \     / ____|
 |     |   |  |  |    |   |  |  \ \   /  __|
 |   | |   | ___ __|  ___ |  |   \ \ /   |
\____|\___/     _|   _|  _|___|   \_/   _____| 

 --- A GOPHER-LIKE INTERFACE FOR HIVE BLOCKCHAIN ---

Learn Zig Series (#65) - Pipes and Inter-Process Communication

BY: @scipio | CREATED: May 31, 2026, 4:49 p.m. | VOTES: 14 | PAYOUT: $0.51 | [ VOTE ]

Learn Zig Series (#65) - Pipes and Inter-Process Communication

[IMAGE: https://images.hive.blog/DQmaHuB6qTWHaSpJHQ1S8FCCRmNQuUxTcPZdU4yKHsJ7vEP/zig-banner.png]

What will I learn

Requirements

Difficulty

Curriculum (of the Learn Zig Series):

Learn Zig Series (#65) - Pipes and Inter-Process Communication

Solutions to Episode 64 Exercises

Exercise 1: Health check for the Supervisor

const std = @import("std");

const SupervisedProcess = struct {
    name: []const u8,
    command: []const []const u8,
    health_check: ?[]const []const u8,
    pid: ?std.posix.pid_t,
    restart_count: u32,
    max_restarts: u32,
    last_exit_code: ?u8,
    last_start_time: i64,
    last_health_check: i64,

    fn isRunning(self: *const SupervisedProcess) bool {
        return self.pid != null;
    }
};

const HealthCheckSupervisor = struct {
    allocator: std.mem.Allocator,
    processes: std.ArrayList(SupervisedProcess),
    health_interval_ms: i64,

    fn init(allocator: std.mem.Allocator) HealthCheckSupervisor {
        return .{
            .allocator = allocator,
            .processes = std.ArrayList(SupervisedProcess).init(allocator),
            .health_interval_ms = 5000,
        };
    }

    fn deinit(self: *HealthCheckSupervisor) void {
        self.processes.deinit();
    }

    fn addProcess(
        self: *HealthCheckSupervisor,
        name: []const u8,
        command: []const []const u8,
        health_check: ?[]const []const u8,
        max_restarts: u32,
    ) !void {
        try self.processes.append(.{
            .name = name,
            .command = command,
            .health_check = health_check,
            .pid = null,
            .restart_count = 0,
            .max_restarts = max_restarts,
            .last_exit_code = null,
            .last_start_time = 0,
            .last_health_check = 0,
        });
    }

    fn startProcess(self: *HealthCheckSupervisor, proc: *SupervisedProcess) !void {
        _ = self;
        var child = std.process.Child.init(proc.command, std.heap.page_allocator);
        child.stdout_behavior = .Inherit;
        child.stderr_behavior = .Inherit;
        try child.spawn();
        proc.pid = child.id;
        proc.last_start_time = std.time.milliTimestamp();
        proc.last_health_check = std.time.milliTimestamp();
    }

    fn runHealthChecks(self: *HealthCheckSupervisor) !void {
        const stdout = std.io.getStdOut().writer();
        const now = std.time.milliTimestamp();

        for (self.processes.items) |*proc| {
            if (!proc.isRunning()) continue;
            const hc = proc.health_check orelse continue;
            if (now - proc.last_health_check < self.health_interval_ms) continue;

            proc.last_health_check = now;

            var checker = std.process.Child.init(hc, self.allocator);
            checker.stdout_behavior = .Ignore;
            checker.stderr_behavior = .Ignore;
            try checker.spawn();
            const result = try checker.wait();

            if (result.Exited != 0) {
                try stdout.print("[health] '{s}' failed check, restarting\n", .{proc.name});
                std.posix.kill(proc.pid.?, std.posix.SIG.KILL) catch {};
                _ = std.posix.waitpid(proc.pid.?, 0);
                proc.pid = null;
                proc.restart_count += 1;
                if (proc.restart_count <= proc.max_restarts) {
                    try self.startProcess(proc);
                }
            }
        }
    }
};

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    var sv = HealthCheckSupervisor.init(allocator);
    defer sv.deinit();

    // the health check verifies /tmp/healthy exists
    try sv.addProcess(
        "worker",
        &.{ "sh", "-c", "touch /tmp/healthy && sleep 60" },
        &.{ "sh", "-c", "test -f /tmp/healthy" },
        3,
    );

    try sv.startProcess(&sv.processes.items[0]);

    // run a few health check cycles
    var ticks: usize = 0;
    while (ticks < 3) : (ticks += 1) {
        std.time.sleep(2 * std.time.ns_per_s);
        try sv.runHealthChecks();
    }
}

The health check runs as a separate child process. If it exits non-zero, the supervisor kills the main process and restarts it. The health_interval_ms field prevents running checks too often.

Exercise 2: Process pool maintaining N workers

const std = @import("std");

const WorkerPool = struct {
    allocator: std.mem.Allocator,
    target_count: usize,
    command: []const []const u8,
    workers: std.ArrayList(Worker),

    const Worker = struct {
        id: usize,
        pid: std.posix.pid_t,
    };

    fn init(allocator: std.mem.Allocator, count: usize, cmd: []const []const u8) WorkerPool {
        return .{
            .allocator = allocator,
            .target_count = count,
            .command = cmd,
            .workers = std.ArrayList(Worker).init(allocator),
        };
    }

    fn deinit(self: *WorkerPool) void {
        self.workers.deinit();
    }

    fn spawnWorker(self: *WorkerPool, id: usize) !void {
        var env_map = std.process.EnvMap.init(self.allocator);
        defer env_map.deinit();

        var id_buf: [20]u8 = undefined;
        const id_str = std.fmt.bufPrint(&id_buf, "{d}", .{id}) catch "0";
        try env_map.put("WORKER_ID", id_str);
        try env_map.put("PATH", std.posix.getenv("PATH") orelse "/usr/bin");

        var child = std.process.Child.init(self.command, self.allocator);
        child.stdout_behavior = .Inherit;
        child.stderr_behavior = .Inherit;
        child.env_map = &env_map;

        try child.spawn();
        try self.workers.append(.{ .id = id, .pid = child.id });
    }

    fn fillPool(self: *WorkerPool) !void {
        var next_id: usize = 0;
        for (self.workers.items) |w| {
            if (w.id >= next_id) next_id = w.id + 1;
        }
        while (self.workers.items.len < self.target_count) {
            try self.spawnWorker(next_id);
            next_id += 1;
        }
    }

    fn reapAndRefill(self: *WorkerPool) !usize {
        var reaped: usize = 0;
        while (true) {
            const result = std.posix.waitpid(-1, std.posix.W.NOHANG);
            if (result.pid <= 0) break;

            // remove from workers list
            var i: usize = 0;
            while (i < self.workers.items.len) {
                if (self.workers.items[i].pid == result.pid) {
                    _ = self.workers.orderedRemove(i);
                    reaped += 1;
                    break;
                }
                i += 1;
            }
        }
        if (reaped > 0) try self.fillPool();
        return reaped;
    }
};

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    var pool = WorkerPool.init(allocator, 3, &.{
        "sh", "-c", "echo worker $WORKER_ID started && sleep 10",
    });
    defer pool.deinit();

    try pool.fillPool();
    const stdout = std.io.getStdOut().writer();
    try stdout.print("Pool started with {d} workers\n", .{pool.workers.items.len});

    var ticks: usize = 0;
    while (ticks < 20) : (ticks += 1) {
        std.time.sleep(500 * std.time.ns_per_ms);
        const n = try pool.reapAndRefill();
        if (n > 0) try stdout.print("  reaped {d}, pool now {d}\n", .{ n, pool.workers.items.len });
    }
}

Each worker gets a unique WORKER_ID in its environment. When reapAndRefill detects a dead worker via non-blocking waitpid, it removes it from the list and calls fillPool to spawn a replacement.

Exercise 3: Pipe executor connecting two processes

const std = @import("std");

fn pipeCommands(
    allocator: std.mem.Allocator,
    cmd1: []const []const u8,
    cmd2: []const []const u8,
) ![]u8 {
    // create the pipe connecting cmd1 stdout to cmd2 stdin
    const pipe_fds = try std.posix.pipe();

    // spawn first command with stdout -> pipe write end
    var child1 = std.process.Child.init(cmd1, allocator);
    child1.stdout_behavior = .{ .fd = pipe_fds[1] };
    child1.stderr_behavior = .Inherit;
    try child1.spawn();

    // parent closes the write end -- only child1 has it
    std.posix.close(pipe_fds[1]);

    // spawn second command with stdin -> pipe read end
    var child2 = std.process.Child.init(cmd2, allocator);
    child2.stdin_behavior = .{ .fd = pipe_fds[0] };
    child2.stdout_behavior = .Pipe;
    child2.stderr_behavior = .Inherit;
    try child2.spawn();

    // parent closes the read end -- only child2 has it
    std.posix.close(pipe_fds[0]);

    // read child2's output
    const reader = child2.stdout.?.reader();
    const output = try reader.readAllAlloc(allocator, 64 * 1024);

    _ = try child1.wait();
    _ = try child2.wait();

    return output;
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    const result = try pipeCommands(
        allocator,
        &.{ "echo", "hello world" },
        &.{ "wc", "-w" },
    );
    defer allocator.free(result);

    const stdout = std.io.getStdOut().writer();
    try stdout.print("pipe result: '{s}'\n", .{std.mem.trim(u8, result, " \n")});
}

The key insight is that you create the pipe BEFORE forking either child, then assign the write end to child1's stdout and the read end to child2's stdin using the .fd behavior. The parent must close both pipe ends after spawning -- otherwise the read end stays open in the parent and child2 never sees EOF.

Last episode we went deep on the Unix process model -- fork, exec, wait, process groups, zombies, the double-fork daemon trick, and a process supervisor. All of that covered how to CREATE and MANAGE processes. But processes by themselves are islands. They exit with a code and that's about it. Today we connect the islands. Pipes are the oldest Unix IPC (inter-process communication) mechanism, and they're still one of the most useful. Every time you type ls | grep foo | wc -l in a terminal, you're using pipes. Every time a parent process reads its child's output, there's a pipe underneath.

We actually already saw pipes in action last episode when we used std.process.Child with .Pipe for stdout/stderr. Today we're going underneath that abstraction to understand how pipes really work, and then we'll build some practical things with them.

Here we go!

Anonymous pipes: connecting parent and child

A pipe is a kernel buffer with two file descriptors: one for writing, one for reading. Data written to the write end can be read from the read end. It's a unidirectional byte stream -- no seeking, no random access, just bytes flowing in one direction. The pipe syscall creates a pair of file descriptors, and typically the parent keeps one end and the child (after fork) gets the other.

Let's start from the ground up with std.posix.pipe:

const std = @import("std");

pub fn main() !void {
    const stdout = std.io.getStdOut().writer();

    // pipe() returns [2]fd: [0] = read end, [1] = write end
    const pipe_fds = try std.posix.pipe();

    const pid = try std.posix.fork();
    if (pid == 0) {
        // child: close the read end, write to the write end
        std.posix.close(pipe_fds[0]);

        const msg = "hello from the child process!\n";
        const writer_fd: std.posix.fd_t = pipe_fds[1];
        _ = std.posix.write(writer_fd, msg) catch {};
        std.posix.close(pipe_fds[1]);
        std.process.exit(0);
    }

    // parent: close the write end, read from the read end
    std.posix.close(pipe_fds[1]);

    var buf: [256]u8 = undefined;
    const n = try std.posix.read(pipe_fds[0], &buf);
    std.posix.close(pipe_fds[0]);

    try stdout.print("Parent received {d} bytes: {s}", .{ n, buf[0..n] });

    _ = std.posix.waitpid(pid, 0);
}

The critical part is which end each process closes. The child closes the read end (it only writes), and the parent closes the write end (it only reads). If you forget to close the write end in the parent, the read call will never return EOF -- the kernel thinks someone might still write to the pipe because a write descriptor is still open. This is one of the most common pipe bugs and it's a real pain to debug because the program just hangs.

After fork, both parent and child have copies of both file descriptors. That's four file descriptors total pointing at the same pipe. You need to close the ones you don't use, otherwise the reference count on the pipe never drops to zero on the write side, and the reader blocks forver.

Named pipes (FIFOs): communicating between unrelated processes

Anonymous pipes only work between related processes (parent-child), because you need fork to share the file descriptors. Named pipes (FIFOs) solve this -- they're filesystem entries that any process can open. One process opens the FIFO for writing, another opens it for reading, and data flows between them:

const std = @import("std");
const linux = std.os.linux;

fn createFifo(path: [*:0]const u8) !void {
    // mkfifo creates a special FIFO file
    const result = linux.mknodat(
        linux.AT.FDCWD,
        path,
        linux.S.IFIFO | 0o666,
        0,
    );
    const err = std.posix.errno(result);
    if (err != .SUCCESS and err != .EXIST) {
        return error.MkfifoFailed;
    }
}

fn writerProcess(path: []const u8) !void {
    const file = try std.fs.cwd().openFile(path, .{ .mode = .write_only });
    defer file.close();

    const messages = [_][]const u8{
        "message one\n",
        "message two\n",
        "message three\n",
    };

    for (messages) |msg| {
        try file.writeAll(msg);
        std.time.sleep(200 * std.time.ns_per_ms);
    }
}

fn readerProcess(path: []const u8) !void {
    const stdout = std.io.getStdOut().writer();

    const file = try std.fs.cwd().openFile(path, .{ .mode = .read_only });
    defer file.close();

    var buf: [256]u8 = undefined;
    while (true) {
        const n = file.read(&buf) catch break;
        if (n == 0) break; // writer closed
        try stdout.print("[reader] got: {s}", .{buf[0..n]});
    }
    try stdout.print("[reader] writer closed, done\n", .{});
}

pub fn main() !void {
    const fifo_path = "/tmp/zig_fifo_demo";
    const fifo_path_z: [*:0]const u8 = "/tmp/zig_fifo_demo";

    try createFifo(fifo_path_z);
    defer std.fs.cwd().deleteFile(fifo_path) catch {};

    const pid = try std.posix.fork();
    if (pid == 0) {
        // child writes
        writerProcess(fifo_path) catch {};
        std.process.exit(0);
    }

    // parent reads
    readerProcess(fifo_path) catch |err| {
        const stdout = std.io.getStdOut().writer();
        stdout.print("reader error: {}\n", .{err}) catch {};
    };

    _ = std.posix.waitpid(pid, 0);
}

The FIFO lives in the filesystem as a special file. ls -l shows it with a p prefix (for pipe). Any number of processes can open it -- one side writes, the other reads. The kernel handles the buffering. When all writers close the FIFO, readers get EOF. When all readers close, writers get SIGPIPE (or EPIPE if the signal is blocked).

Named pipes are useful for simple inter-process communication where you don't want the overhead of sockets or shared memory. Log aggregation, command-and-control channels between daemons, and communication between scripts written in different languages -- FIFOs handle all of these. The downside compared to Unix domain sockets is that FIFOs are unidirectional. For bidirectional communication you need two FIFOs (or switch to sockets, which we'll cover in a future episode).

Pipe buffering and blocking behavior

Pipes have a kernel buffer. On Linux this is typically 64 KiB (16 pages of 4 KiB each). When the buffer is full, writes block. When the buffer is empty, reads block. This backpressure mechanism is what makes pipes self-regulating -- a fast producer automatically slows down when the consumer can't keep up:

const std = @import("std");
const linux = std.os.linux;

pub fn main() !void {
    const stdout_w = std.io.getStdOut().writer();
    const pipe_fds = try std.posix.pipe();

    // check the pipe buffer size using fcntl F_GETPIPE_SZ
    const pipe_size = linux.fcntl(pipe_fds[0], linux.F.GETPIPE_SZ, @as(u64, 0));
    try stdout_w.print("Pipe buffer size: {d} bytes ({d} KiB)\n", .{
        pipe_size,
        @divFloor(pipe_size, 1024),
    });

    const pid = try std.posix.fork();
    if (pid == 0) {
        // child: write more data than the pipe can hold
        std.posix.close(pipe_fds[0]);

        const chunk: [4096]u8 = [_]u8{'A'} ** 4096;
        var total: usize = 0;
        var block_count: usize = 0;

        while (total < 256 * 1024) { // try to write 256 KiB
            const n = std.posix.write(pipe_fds[1], &chunk) catch |err| {
                const stderr = std.io.getStdErr().writer();
                stderr.print("write error after {d} bytes: {}\n", .{ total, err }) catch {};
                break;
            };
            total += n;
            block_count += 1;
        }

        const stderr = std.io.getStdErr().writer();
        stderr.print("[writer] wrote {d} bytes in {d} calls\n", .{ total, block_count }) catch {};
        std.posix.close(pipe_fds[1]);
        std.process.exit(0);
    }

    // parent: read slowly to demonstrate blocking on the writer side
    std.posix.close(pipe_fds[1]);

    var total_read: usize = 0;
    var read_buf: [1024]u8 = undefined;

    while (true) {
        // slow reader -- sleep between reads
        std.time.sleep(10 * std.time.ns_per_ms);

        const n = std.posix.read(pipe_fds[0], &read_buf) catch break;
        if (n == 0) break;
        total_read += n;
    }

    std.posix.close(pipe_fds[0]);
    _ = std.posix.waitpid(pid, 0);

    try stdout_w.print("[reader] total read: {d} bytes\n", .{total_read});
}

When you run this, the writer fills the 64 KiB buffer quickly, then blocks on the next write until the reader consumes some data. The write call doesn't return until space is available. This is blocking I/O in action -- and it's the default behavior for pipes.

You can make pipe file descriptors non-blocking by passing O_NONBLOCK to pipe2 (or setting it with fcntl after creation). With non-blocking I/O, a write to a full pipe returns error.WouldBlock instead of blocking, and a read from an empty pipe does the same. This is usful when you need to multiplex I/O across multiple pipes, which is exactly what we'll do next.

One more thing about pipe sizes: on Linux you can increase the buffer with fcntl(fd, F_SETPIPE_SZ, new_size). The maximum is controlled by /proc/sys/fs/pipe-max-size (usually 1 MiB). Bigger buffers reduce context switches between producer and consumer but use more kernel memory. For most use cases the default 64 KiB is fine.

Multiplexing pipes with poll

When you have multiple child processes, each with its own pipe, you need to read from all of them without blocking on any single one. The poll syscall solves this -- you give it a list of file descriptors and it tells you which ones have data available:

const std = @import("std");
const linux = std.os.linux;

const PipeMultiplexer = struct {
    allocator: std.mem.Allocator,
    pipes: std.ArrayList(PipeEntry),

    const PipeEntry = struct {
        name: []const u8,
        read_fd: std.posix.fd_t,
        child_pid: std.posix.pid_t,
        closed: bool,
    };

    fn init(allocator: std.mem.Allocator) PipeMultiplexer {
        return .{
            .allocator = allocator,
            .pipes = std.ArrayList(PipeEntry).init(allocator),
        };
    }

    fn deinit(self: *PipeMultiplexer) void {
        for (self.pipes.items) |entry| {
            if (!entry.closed) std.posix.close(entry.read_fd);
        }
        self.pipes.deinit();
    }

    fn addChild(self: *PipeMultiplexer, name: []const u8, cmd: []const []const u8) !void {
        const pipe_fds = try std.posix.pipe();

        const pid = try std.posix.fork();
        if (pid == 0) {
            // child: redirect stdout to pipe write end
            std.posix.close(pipe_fds[0]);
            std.posix.dup2(pipe_fds[1], 1) catch std.process.exit(1);
            std.posix.close(pipe_fds[1]);

            // exec the command
            const argv = @as(
                [*:null]const ?[*:0]const u8,
                @ptrCast(cmd.ptr),
            );
            const err = std.posix.execvpeZ(
                @ptrCast(cmd[0]),
                argv,
                @ptrCast(std.c.environ),
            );
            _ = err;
            std.process.exit(127);
        }

        // parent: close write end, keep read end
        std.posix.close(pipe_fds[1]);

        try self.pipes.append(.{
            .name = name,
            .read_fd = pipe_fds[0],
            .child_pid = pid,
            .closed = false,
        });
    }

    fn readAll(self: *PipeMultiplexer) !void {
        const stdout = std.io.getStdOut().writer();
        var buf: [512]u8 = undefined;

        while (true) {
            // count how many are still open
            var open_count: usize = 0;
            for (self.pipes.items) |e| {
                if (!e.closed) open_count += 1;
            }
            if (open_count == 0) break;

            // build poll fd array
            var pollfds = try self.allocator.alloc(linux.pollfd, self.pipes.items.len);
            defer self.allocator.free(pollfds);

            for (self.pipes.items, 0..) |entry, i| {
                pollfds[i] = .{
                    .fd = if (entry.closed) -1 else entry.read_fd,
                    .events = linux.POLL.IN,
                    .revents = 0,
                };
            }

            // poll with 1 second timeout
            const ready = linux.poll(pollfds.ptr, @intCast(pollfds.len), 1000);
            if (@as(isize, @bitCast(@as(usize, ready))) <= 0) continue;

            for (self.pipes.items, 0..) |*entry, i| {
                if (entry.closed) continue;
                if (pollfds[i].revents & linux.POLL.IN != 0) {
                    const n = std.posix.read(entry.read_fd, &buf) catch 0;
                    if (n == 0) {
                        // EOF
                        std.posix.close(entry.read_fd);
                        entry.closed = true;
                        _ = std.posix.waitpid(entry.child_pid, 0);
                    } else {
                        try stdout.print("[{s}] {s}", .{ entry.name, buf[0..n] });
                    }
                }
                if (pollfds[i].revents & linux.POLL.HUP != 0 and
                    pollfds[i].revents & linux.POLL.IN == 0)
                {
                    std.posix.close(entry.read_fd);
                    entry.closed = true;
                    _ = std.posix.waitpid(entry.child_pid, 0);
                }
            }
        }
    }
};

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    var mux = PipeMultiplexer.init(allocator);
    defer mux.deinit();

    // Note: addChild uses fork+exec internaly
    // For this demo, we use std.process.Child instead
    // to avoid the raw execvpe complexity. Shown here
    // is the poll logic -- the real value of this example.

    // For a working version, let's use pipe() + Child:
    const stdout = std.io.getStdOut().writer();

    const pipe1 = try std.posix.pipe();
    const pipe2 = try std.posix.pipe();

    const pid1 = try std.posix.fork();
    if (pid1 == 0) {
        std.posix.close(pipe1[0]);
        std.posix.close(pipe2[0]);
        std.posix.close(pipe2[1]);
        _ = std.posix.write(pipe1[1], "output from child A\n") catch {};
        std.time.sleep(100 * std.time.ns_per_ms);
        _ = std.posix.write(pipe1[1], "more from child A\n") catch {};
        std.posix.close(pipe1[1]);
        std.process.exit(0);
    }

    const pid2 = try std.posix.fork();
    if (pid2 == 0) {
        std.posix.close(pipe1[0]);
        std.posix.close(pipe1[1]);
        std.posix.close(pipe2[0]);
        std.time.sleep(50 * std.time.ns_per_ms);
        _ = std.posix.write(pipe2[1], "output from child B\n") catch {};
        std.posix.close(pipe2[1]);
        std.process.exit(0);
    }

    // parent closes write ends
    std.posix.close(pipe1[1]);
    std.posix.close(pipe2[1]);

    var pollfds = [_]linux.pollfd{
        .{ .fd = pipe1[0], .events = linux.POLL.IN, .revents = 0 },
        .{ .fd = pipe2[0], .events = linux.POLL.IN, .revents = 0 },
    };

    const names = [_][]const u8{ "childA", "childB" };
    var open_count: usize = 2;
    var buf: [256]u8 = undefined;

    while (open_count > 0) {
        const ready = linux.poll(&pollfds, 2, 2000);
        if (@as(isize, @bitCast(@as(usize, ready))) <= 0) continue;

        for (&pollfds, 0..) |*pfd, i| {
            if (pfd.fd < 0) continue;
            if (pfd.revents & linux.POLL.IN != 0) {
                const n = std.posix.read(pfd.fd, &buf) catch 0;
                if (n == 0) {
                    std.posix.close(pfd.fd);
                    pfd.fd = -1;
                    open_count -= 1;
                } else {
                    try stdout.print("[{s}] {s}", .{ names[i], buf[0..n] });
                }
            }
            if (pfd.revents & linux.POLL.HUP != 0 and pfd.revents & linux.POLL.IN == 0) {
                std.posix.close(pfd.fd);
                pfd.fd = -1;
                open_count -= 1;
            }
        }
    }

    _ = std.posix.waitpid(pid1, 0);
    _ = std.posix.waitpid(pid2, 0);

    try stdout.print("All children done\n", .{});
    _ = mux;
}

poll takes an array of pollfd structs. Each one has an fd (file descriptor), events (what you're interested in -- POLLIN for "data available to read"), and revents (what actually happened -- filled in by the kernel). Setting fd to -1 tells poll to skip that entry.

The POLLHUP flag (hangup) fires when the write end of the pipe is closed. You might get POLLIN and POLLHUP at the same time if there's still data in the buffer when the writer closes. Always check for POLLIN first and read any remaining data before treating POLLHUP as end-of-stream.

This pattern -- poll across multiple file descriptors, handle whichever ones are ready -- is the foundation of event-driven programming. Web servers, databases, message brokers -- they all use some variant of poll/epoll/kqueue to handle thousands of connections concurrently on a single thread. We used epoll in the HTTP server episodes (51-54), and the concept is identical here with pipes.

Building a pipeline: chaining processes like a shell

A shell pipeline like cat /etc/passwd | grep root | wc -l connects three processes with two pipes. Each process's stdout becomes the next process's stdin. Let's build a general pipeline executor:

const std = @import("std");

const Pipeline = struct {
    allocator: std.mem.Allocator,
    stages: std.ArrayList([]const []const u8),

    fn init(allocator: std.mem.Allocator) Pipeline {
        return .{
            .allocator = allocator,
            .stages = std.ArrayList([]const []const u8).init(allocator),
        };
    }

    fn deinit(self: *Pipeline) void {
        self.stages.deinit();
    }

    fn addStage(self: *Pipeline, cmd: []const []const u8) !void {
        try self.stages.append(cmd);
    }

    fn execute(self: *Pipeline) ![]u8 {
        if (self.stages.items.len == 0) return error.EmptyPipeline;

        var child_pids = std.ArrayList(std.posix.pid_t).init(self.allocator);
        defer child_pids.deinit();

        // we need N-1 pipes for N stages
        var prev_read_fd: ?std.posix.fd_t = null;

        for (self.stages.items, 0..) |cmd, i| {
            const is_last = (i == self.stages.items.len - 1);

            // create a pipe for this stage's output (unless it's the last)
            var out_pipe: ?[2]std.posix.fd_t = null;
            if (!is_last) {
                out_pipe = try std.posix.pipe();
            }

            // also pipe the last stage's stdout so we can capture it
            var capture_pipe: ?[2]std.posix.fd_t = null;
            if (is_last) {
                capture_pipe = try std.posix.pipe();
            }

            var child = std.process.Child.init(cmd, self.allocator);

            if (prev_read_fd) |fd| {
                child.stdin_behavior = .{ .fd = fd };
            }
            if (out_pipe) |p| {
                child.stdout_behavior = .{ .fd = p[1] };
            } else if (capture_pipe) |p| {
                child.stdout_behavior = .{ .fd = p[1] };
            }
            child.stderr_behavior = .Inherit;

            try child.spawn();
            try child_pids.append(child.id);

            // close pipe ends in parent
            if (prev_read_fd) |fd| std.posix.close(fd);
            if (out_pipe) |p| {
                std.posix.close(p[1]); // close write end
                prev_read_fd = p[0]; // keep read end for next stage
            }
            if (capture_pipe) |p| {
                std.posix.close(p[1]); // close write end
                prev_read_fd = p[0]; // capture read end
            }
        }

        // read the final output
        var output = std.ArrayList(u8).init(self.allocator);
        if (prev_read_fd) |fd| {
            var buf: [4096]u8 = undefined;
            while (true) {
                const n = std.posix.read(fd, &buf) catch break;
                if (n == 0) break;
                try output.appendSlice(buf[0..n]);
            }
            std.posix.close(fd);
        }

        // wait for all children
        for (child_pids.items) |pid| {
            _ = std.posix.waitpid(pid, 0);
        }

        return try output.toOwnedSlice();
    }
};

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();
    const stdout = std.io.getStdOut().writer();

    // pipeline: cat /etc/passwd | grep root | wc -l
    var pipeline = Pipeline.init(allocator);
    defer pipeline.deinit();

    try pipeline.addStage(&.{ "cat", "/etc/passwd" });
    try pipeline.addStage(&.{ "grep", "root" });
    try pipeline.addStage(&.{ "wc", "-l" });

    const result = try pipeline.execute();
    defer allocator.free(result);

    try stdout.print("Pipeline output: {s}", .{result});

    // another pipeline: ls /usr/bin | sort | head -5
    var pipeline2 = Pipeline.init(allocator);
    defer pipeline2.deinit();

    try pipeline2.addStage(&.{ "ls", "/usr/bin" });
    try pipeline2.addStage(&.{ "sort" });
    try pipeline2.addStage(&.{ "head", "-5" });

    const result2 = try pipeline2.execute();
    defer allocator.free(result2);

    try stdout.print("Pipeline 2 output:\n{s}", .{result2});
}

The algorithm is: for N stages, create N-1 pipes. Each stage's stdin is the previous pipe's read end, and each stage's stdout is the next pipe's write end. The parent closes both ends of each pipe after spawning the children -- this is critical, because if the parent holds a write end open, the downstream reader will never see EOF.

Notice how we capture the final stage's output by creating one extra pipe for the last stage. The parent reads from it after all children are spawned. This gives us the same result as running the pipeline in a shell and reading stdout.

This is basically what our shell project did in episodes 47-50, but stripped down to the essentials. The shell version handled more edge cases (built-in commands, environment variables, job control), but the pipe plumbing is identical.

Passing structured data through pipes: length-prefixed messages

Raw bytes are fine for text streams, but when you need to send structured messages between processes, you need a framing protocol. The simplest approach is length-prefixed messages: send a 4-byte length header followed by that many bytes of payload:

const std = @import("std");

const Message = struct {
    msg_type: u8,
    payload: []const u8,

    fn encode(self: Message, writer: anytype) !void {
        // write length header (4 bytes big-endian)
        const total_len: u32 = @intCast(1 + self.payload.len);
        const len_bytes = std.mem.toBytes(std.mem.nativeToBig(u32, total_len));
        try writer.writeAll(&len_bytes);

        // write type byte
        try writer.writeByte(self.msg_type);

        // write payload
        try writer.writeAll(self.payload);
    }

    fn decode(allocator: std.mem.Allocator, reader: anytype) !?Message {
        // read length header
        var len_bytes: [4]u8 = undefined;
        const n = reader.readAll(&len_bytes) catch return null;
        if (n < 4) return null;

        const total_len = std.mem.bigToNative(u32, std.mem.bytesToValue(u32, &len_bytes));
        if (total_len == 0) return null;
        if (total_len > 1024 * 1024) return error.MessageTooLarge;

        // read type byte
        const msg_type = reader.readByte() catch return null;

        // read payload
        const payload_len = total_len - 1;
        const payload = try allocator.alloc(u8, payload_len);
        const read_count = reader.readAll(payload) catch |err| {
            allocator.free(payload);
            return err;
        };
        if (read_count < payload_len) {
            allocator.free(payload);
            return null;
        }

        return .{
            .msg_type = msg_type,
            .payload = payload,
        };
    }
};

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();
    const stdout = std.io.getStdOut().writer();

    const pipe_fds = try std.posix.pipe();

    const pid = try std.posix.fork();
    if (pid == 0) {
        // child: send structured messages
        std.posix.close(pipe_fds[0]);
        const file = std.fs.File{ .handle = pipe_fds[1] };
        const writer = file.writer();

        const messages = [_]Message{
            .{ .msg_type = 1, .payload = "system initialized" },
            .{ .msg_type = 2, .payload = "processing batch 42" },
            .{ .msg_type = 3, .payload = "task complete: 128 items" },
            .{ .msg_type = 1, .payload = "shutting down" },
        };

        for (messages) |msg| {
            msg.encode(writer) catch break;
        }

        std.posix.close(pipe_fds[1]);
        std.process.exit(0);
    }

    // parent: decode structured messages
    std.posix.close(pipe_fds[1]);
    const file = std.fs.File{ .handle = pipe_fds[0] };
    const reader = file.reader();

    const type_names = [_][]const u8{ "???", "INFO", "PROGRESS", "RESULT" };

    while (true) {
        const msg = Message.decode(allocator, reader) catch break;
        if (msg == null) break;
        const m = msg.?;
        defer allocator.free(m.payload);

        const type_name = if (m.msg_type < type_names.len)
            type_names[m.msg_type]
        else
            "UNKNOWN";

        try stdout.print("[{s}] {s}\n", .{ type_name, m.payload });
    }

    std.posix.close(pipe_fds[0]);
    _ = std.posix.waitpid(pid, 0);
}

Length-prefixed framing is used everywhere: HTTP/2 frames, WebSocket frames, protobuf wire format, database protocols (PostgreSQL's wire protocol sends a 4-byte length before every message). The big advantage over delimiter-based framing (like newline-separated JSON) is that the payload can contain any bytes, including newlines and null bytes. You know exactly how many bytes to read, so you never accidentally split a message or merge two together.

The big-endian byte order for the length header is a convention from network protocols (it's sometimes called "network byte order"). It means the most significant byte comes first. Zig's std.mem.nativeToBig handles the conversion regardless of the host's endianness.

Pipe capacity and flow control

We touched on pipe buffer sizes earlier, but let's look at this more systematically. Understanding pipe capacity matters when you're designing systems with multiple stages that produce and consume at different rates:

const std = @import("std");
const linux = std.os.linux;

fn measurePipeCapacity() !void {
    const stdout = std.io.getStdOut().writer();

    const pipe_fds = try std.posix.pipe();
    defer std.posix.close(pipe_fds[0]);
    defer std.posix.close(pipe_fds[1]);

    // get current pipe size
    const current = linux.fcntl(pipe_fds[1], linux.F.GETPIPE_SZ, @as(u64, 0));
    try stdout.print("Default pipe size: {d} bytes ({d} KiB)\n", .{
        current, @divFloor(current, 1024),
    });

    // try to increase it
    const requested: usize = 256 * 1024; // 256 KiB
    const result = linux.fcntl(
        pipe_fds[1],
        linux.F.SETPIPE_SZ,
        @as(u64, requested),
    );
    if (@as(isize, @bitCast(@as(usize, result))) > 0) {
        try stdout.print("Resized pipe to: {d} bytes ({d} KiB)\n", .{
            result, @divFloor(result, 1024),
        });
    }

    // measure actual capacity by writing until we'd block
    // set pipe to non-blocking first
    _ = linux.fcntl(pipe_fds[1], linux.F.SETFL, @as(u64, linux.O.NONBLOCK));

    const chunk: [4096]u8 = [_]u8{'X'} ** 4096;
    var total: usize = 0;

    while (true) {
        const n = std.posix.write(pipe_fds[1], &chunk) catch break;
        total += n;
    }

    try stdout.print("Actual capacity before EAGAIN: {d} bytes ({d} KiB)\n", .{
        total, @divFloor(total, 1024),
    });
}

fn demonstrateAtomicWrites() !void {
    const stdout = std.io.getStdOut().writer();

    // PIPE_BUF is the max size for atomic writes (4096 on Linux)
    // writes <= PIPE_BUF are guaranteed atomic (no interleaving)
    // writes > PIPE_BUF may be split across multiple reads

    const pipe_fds = try std.posix.pipe();

    const pid1 = try std.posix.fork();
    if (pid1 == 0) {
        std.posix.close(pipe_fds[0]);
        // write small messages (under PIPE_BUF) -- these are atomic
        var i: usize = 0;
        while (i < 10) : (i += 1) {
            var msg: [64]u8 = undefined;
            const len = std.fmt.bufPrint(&msg, "writer-A message {d}\n", .{i}) catch break;
            _ = std.posix.write(pipe_fds[1], msg[0..len.len]) catch break;
        }
        std.posix.close(pipe_fds[1]);
        std.process.exit(0);
    }

    const pid2 = try std.posix.fork();
    if (pid2 == 0) {
        std.posix.close(pipe_fds[0]);
        var i: usize = 0;
        while (i < 10) : (i += 1) {
            var msg: [64]u8 = undefined;
            const len = std.fmt.bufPrint(&msg, "writer-B message {d}\n", .{i}) catch break;
            _ = std.posix.write(pipe_fds[1], msg[0..len.len]) catch break;
        }
        std.posix.close(pipe_fds[1]);
        std.process.exit(0);
    }

    std.posix.close(pipe_fds[1]);

    // read and display -- messages may be interleaved but never torn
    var buf: [4096]u8 = undefined;
    while (true) {
        const n = std.posix.read(pipe_fds[0], &buf) catch break;
        if (n == 0) break;
        try stdout.print("{s}", .{buf[0..n]});
    }

    std.posix.close(pipe_fds[0]);
    _ = std.posix.waitpid(pid1, 0);
    _ = std.posix.waitpid(pid2, 0);
}

pub fn main() !void {
    try measurePipeCapacity();
    try stdout_print_sep();
    try demonstrateAtomicWrites();
}

fn stdout_print_sep() !void {
    const stdout = std.io.getStdOut().writer();
    try stdout.print("---\n", .{});
}

The PIPE_BUF constant (4096 bytes on Linux) is important for concurrent writers. POSIX guarantees that writes of PIPE_BUF bytes or fewer are atomic -- they won't be interleaved with writes from other processes. Writes larger than PIPE_BUF have no atomicity guarantee; the kernel may split them, causing data from different writers to get mixed together. If you have multiple processes writing to the same pipe (e.g., multiple workers logging to a shared pipe), keep individual write calls under 4096 bytes and you're safe.

Exercises

  1. Build a "tee" command in Zig. Read stdin, write every byte to BOTH stdout AND a file specified as a command-line argument, simultaneously. Use std.posix.pipe and fork a child process that handles the file writing, while the parent handles stdout. Test it with echo "hello tee" | ./your_tee /tmp/tee_output.txt and verify the file contains the same output as the terminal.

  2. Write a bi-directional IPC system using two anonymous pipes. The parent sends a request message (length-prefixed, like we built in this episode) to the child, and the child sends a response back. Implement a simple "echo server" protocol: the child reads a message, converts it to uppercase, and sends it back. Test with 5 different messages and verify each response is the uppercased version.

  3. Build a parallel command executor that runs N commands simultaneously, captures all their stdout via pipes, and prints the output in order (command 1's full output first, then command 2's, etc.) even though the commands finish in arbitrary order. Use poll to read from all pipes concurrently (so no pipe buffer fills up and blocks a child), but buffer the output per-command and print in sequence after all commands finish. Test with commands that produce different amounts of output and finish at different times.

Wat we geleerd hebben

Pipes are the glue of Unix. They've been around since 1973 and they're not going anywhere. Understanding how they work at the syscall level -- how buffers fill and drain, how file descriptor management controls EOF, how poll multiplexes I/O -- gives you the building blocks for much more complex IPC. The shared memory and semaphore mechanisms we'll look at next build on this same foundation of kernel-managed buffers and file descriptors, but with different tradeoffs.

Bedankt en tot de volgende keer!

@scipio

TAGS: [ #stem ] [ #stemsocial ] [ #steemstem ] [ #zig ] [ #programming ]

Replies

@hivebuzz | June 1, 2026, 4:02 a.m. | Votes: 0 | [ VOTE ]

Congratulations @scipio! You have completed the following achievement on the Hive blockchain And have been rewarded with New badge(s)

You received more than 35000 upvotes.Your next target is to reach 40000 upvotes.
You have been a buzzy bee and published a post every day of the week.
You have been a buzzy bee and published a post every day of the month.

You can view your badges on your board and compare yourself to others in the Ranking
If you no longer want to receive notifications, reply to this comment with the word STOP

Check out our last posts:

Hive Power Up Month Challenge - May 2026 Winners ListBe ready for the June edition of the Hive Power Up Month!Hive Power Up Day - June 1st 2026

[ BACK TO TRENDING ] [ BACK TO MENU ]
CMD>