Channels

Channels enable communication between concurrent tasks. They provide a safe, typed way to pass data between producers and consumers.

Channel Basics

A channel is a queue with two ends:

  • Producer — sends values into the channel
  • Consumer — receives values from the channel
// Create a channel
let (producer, consumer) = channel<int>(buffer: 10);

// Producer sends values
producer.send(value: 1);
producer.send(value: 2);
producer.close();

// Consumer receives values
let first = consumer.receive();   // Some(1)
let second = consumer.receive();  // Some(2)
let done = consumer.receive();    // None (channel closed)

Channel Types

Ori provides four channel constructors for different communication patterns:

channel — One-to-One

Basic channel with one producer and one consumer:

let (producer, consumer) = channel<int>(buffer: 10);
  • Producer<T> — cannot be cloned
  • Consumer<T> — cannot be cloned
  • Use when: Single producer, single consumer

channel_in — Fan-In (Many-to-One)

Multiple producers, one consumer:

let (producer, consumer) = channel_in<int>(buffer: 10);
  • CloneableProducer<T> — can be cloned for multiple senders
  • Consumer<T> — cannot be cloned
  • Use when: Aggregating results from multiple workers
@aggregate_results (worker_count: int) -> [Result<int, Error>] uses Suspend = {
    let (producer, consumer) = channel_in<Result<int, Error>>(buffer: 100);

    nursery(
        body: n -> {
            // Spawn multiple workers, each with a cloned producer
            for i in 0..worker_count do {
                let p = producer.clone();
                n.spawn(task: () -> {
                    let result = compute_work(worker_id: i);
                    p.send(value: result);
                });
            }
            producer.close();
        }
        on_error: CollectAll
        timeout: 60s
    );

    // Collect all results
    let results: [Result<int, Error>] = [];
    loop {
        match consumer.receive() {
            Some(r) -> {
                results = [...results, r];
                continue
            }
            None -> break results
        }
    }
}

channel_out — Fan-Out (One-to-Many)

One producer, multiple consumers:

let (producer, consumer) = channel_out<int>(buffer: 10);
  • Producer<T> — cannot be cloned
  • CloneableConsumer<T> — can be cloned for multiple receivers
  • Use when: Distributing work to multiple workers
@distribute_work (items: [Item], worker_count: int) -> void uses Suspend = {
    let (producer, consumer) = channel_out<Item>(buffer: 100);

    nursery(
        body: n -> {
            // Spawn workers with cloned consumers
            for _ in 0..worker_count do {
                let c = consumer.clone();
                n.spawn(task: () -> worker(input: c));
            }

            // Send work to the channel
            for item in items do
                producer.send(value: item);
            producer.close();
        }
        on_error: CollectAll
        timeout: 300s
    );
}

@worker (input: CloneableConsumer<Item>) -> void uses Suspend = {
    loop {
        match input.receive() {
            Some(item) -> {
                process_item(item: item);
                continue
            }
            None -> break
        }
    }
}

channel_all — Many-to-Many

Multiple producers and multiple consumers:

let (producer, consumer) = channel_all<int>(buffer: 10);
  • CloneableProducer<T> — can be cloned
  • CloneableConsumer<T> — can be cloned
  • Use when: Complex communication patterns

Channel Operations

Producer Methods

// Send a value (blocks if buffer is full)
producer.send(value: 42);

// Close the channel (no more values can be sent)
producer.close();

// Check if channel is closed
if producer.is_closed() then ...;

Consumer Methods

// Receive a value (blocks if buffer is empty)
let value = consumer.receive();  // Option<T>

// Check if channel is closed
if consumer.is_closed() then ...;

Consumer as Iterator

Consumers implement Iterable, so you can use them in for loops:

for value in consumer do
    process(value: value);

This is equivalent to:

loop {
    match consumer.receive() {
        Some(value) -> process(value: value)
        None -> break
    }
}

The Buffer

The buffer parameter controls how many values can be queued:

// Small buffer — producers block quickly
let (p, c) = channel<int>(buffer: 1);

// Larger buffer — more values can queue
let (p, c) = channel<int>(buffer: 1000);

Buffer Sizing Guidelines

ScenarioBuffer Size
Tight synchronization0 or 1
Producer faster than consumerLarger buffer
Consumer faster than producerSmall buffer is fine
UnknownStart with 10-100, tune based on performance

Zero Buffer (Synchronous)

let (producer, consumer) = channel<int>(buffer: 0);

// Send blocks until receive happens
// Receive blocks until send happens

Zero-buffer channels enforce synchronization between producer and consumer.

The Sendable Trait

Channel values must implement Sendable:

// OK: primitives are Sendable
channel<int>(buffer: 10);
channel<str>(buffer: 10);

// OK: simple structs are Sendable
type Message = { id: int, content: str }
channel<Message>(buffer: 10);

// ERROR: closures capturing mutable state are not Sendable
let counter = 0;
channel<() -> int>(buffer: 10);  // Not allowed if closure captures mutable state

Sendable is automatically implemented for types that are safe to transfer between concurrent tasks:

  • All primitive types
  • Structs containing only Sendable types
  • Immutable closures (capturing only immutable bindings)

Channel Patterns

Worker Pool

Process items with a fixed number of workers:

type WorkItem = { id: int, data: str }
type WorkResult = { id: int, output: str }

@process_with_pool (
    items: [WorkItem],
    worker_count: int,
) -> [WorkResult] uses Suspend = {
    let (work_producer, work_consumer) = channel_out<WorkItem>(buffer: 100);
    let (result_producer, result_consumer) = channel_in<WorkResult>(buffer: 100);

    nursery(
        body: n -> {
            // Spawn workers
            for _ in 0..worker_count do {
                let wc = work_consumer.clone();
                let rp = result_producer.clone();
                n.spawn(task: () -> pool_worker(work: wc, results: rp));
            }

            // Send work
            for item in items do
                work_producer.send(value: item);
            work_producer.close();
        }
        on_error: CollectAll
        timeout: 300s
    );

    // Collect results
    let results: [WorkResult] = [];
    loop {
        match result_consumer.receive() {
            Some(r) -> {
                results = [...results, r];
                continue
            }
            None -> break results
        }
    }
}

@pool_worker (
    work: CloneableConsumer<WorkItem>,
    results: CloneableProducer<WorkResult>,
) -> void uses Suspend = {
    for item in work do {
        let output = process_work(item: item);
        results.send(value: WorkResult { id: item.id, output });
    }
}

@process_work (item: WorkItem) -> str = `Processed: {item.data}`;

Pipeline

Chain processing stages:

type Stage1Output = { data: str }
type Stage2Output = { data: str, processed: bool }
type FinalOutput = { data: str, processed: bool, validated: bool }

@pipeline (input: [str]) -> [FinalOutput] uses Suspend = {
    // Create channels between stages
    let (stage1_out, stage2_in) = channel<Stage1Output>(buffer: 50);
    let (stage2_out, stage3_in) = channel<Stage2Output>(buffer: 50);
    let (stage3_out, collector) = channel<FinalOutput>(buffer: 50);


    nursery(
        body: n -> {
            // Stage 1: Transform
            n.spawn(task: () -> {
                for item in input do
                    stage1_out.send(value: Stage1Output { data: item });
                stage1_out.close();
            });

            // Stage 2: Process
            n.spawn(task: () -> {
                for item in stage2_in do
                    stage2_out.send(value: Stage2Output {
                        data: item.data
                        processed: true
                    });
                stage2_out.close();
            });

            // Stage 3: Validate
            n.spawn(task: () -> {
                for item in stage3_in do
                    stage3_out.send(value: FinalOutput {
                        data: item.data
                        processed: item.processed
                        validated: true
                    });
                stage3_out.close();
            });
        }
        on_error: FailFast
        timeout: 60s
    );

    // Collect final results
    let results: [FinalOutput] = [];
    for result in collector do
        results = [...results, result];
    results
}

Fan-Out/Fan-In

Distribute work, then aggregate:

@fan_out_fan_in (items: [int], worker_count: int) -> int uses Suspend = {
    // Fan-out channel
    let (distribute, workers) = channel_out<int>(buffer: 100);

    // Fan-in channel
    let (results, aggregator) = channel_in<int>(buffer: 100);

    nursery(
        body: n -> {
            // Workers (fan-out -> process -> fan-in)
            for _ in 0..worker_count do {
                let w = workers.clone();
                let r = results.clone();
                n.spawn(task: () -> {
                    for item in w do
                        r.send(value: item * 2);  // Process: double the value
                });
            }

            // Distribute items
            for item in items do
                distribute.send(value: item);
            distribute.close();
        }
        on_error: CollectAll
        timeout: 60s
    );

    // Aggregate results
    let sum = 0;
    for value in aggregator do
        sum = sum + value;
    sum
}

Rate Limiter

Control throughput with a token bucket:

type Token = {}

@rate_limited_worker (
    work: CloneableConsumer<WorkItem>,
    tokens: Consumer<Token>,
    results: CloneableProducer<WorkResult>,
) -> void uses Suspend = {
    for item in work do {
        // Wait for a token before processing
        let _ = tokens.receive();
        let output = process_work(item: item);
        results.send(value: WorkResult { id: item.id, output });
    }
}

@token_generator (
    tokens: Producer<Token>,
    rate: int,  // tokens per second
) -> void uses Suspend, Clock = {
    loop {
        tokens.send(value: Token {});
        sleep(duration: 1s / rate);
    }
}

Testing Channels

Basic Channel Tests

@test_basic_channel tests _ () -> void = {
    let (producer, consumer) = channel<int>(buffer: 3);

    producer.send(value: 1);
    producer.send(value: 2);
    producer.send(value: 3);
    producer.close();

    assert_eq(actual: consumer.receive(), expected: Some(1));
    assert_eq(actual: consumer.receive(), expected: Some(2));
    assert_eq(actual: consumer.receive(), expected: Some(3));
    assert_eq(actual: consumer.receive(), expected: None)
}

Testing Channel Patterns

@test_worker_pool tests @process_with_pool () -> void = {
    let items = [
        WorkItem { id: 1, data: "a" }
        WorkItem { id: 2, data: "b" }
        WorkItem { id: 3, data: "c" }
    ];

    let results = process_with_pool(items: items, worker_count: 2);

    assert_eq(actual: len(collection: results), expected: 3)
    // Note: order may vary due to concurrency
}

@test_fan_out_fan_in tests @fan_out_fan_in () -> void = {
    let items = [1, 2, 3, 4, 5];
    let result = fan_out_fan_in(items: items, worker_count: 3);

    // Sum of doubled values: 2+4+6+8+10 = 30
    assert_eq(actual: result, expected: 30)
}

Error Handling

Producer Errors

If a producer encounters an error, close the channel and signal:

@producer_with_errors (
    output: Producer<Result<int, Error>>,
) -> void uses Suspend = {
    for i in 0..10 do {
        let result = might_fail(value: i);
        match result {
            Ok(v) -> output.send(value: Ok(v))
            Err(e) -> {
                output.send(value: Err(e));
                output.close();
                break
            }
        }
    }
    output.close();
}

Consumer Error Aggregation

@consume_with_errors (
    input: Consumer<Result<int, Error>>,
) -> (int, [Error]) uses Suspend = {
    let sum = 0;
    let errors: [Error] = [];

    for result in input do
        match result {
            Ok(v) -> sum = sum + v
            Err(e) -> errors = [...errors, e]
        };

    (sum, errors)
}

Best Practices

Close Channels Properly

Always close channels when done sending:

// BAD: Consumer waits forever
producer.send(value: 1);
producer.send(value: 2);
// Missing: producer.close()

// GOOD: Consumer knows when to stop
producer.send(value: 1);
producer.send(value: 2);
producer.close();

Handle Closed Channels

Check for None when receiving:

loop {
    match consumer.receive() {
        Some(value) -> process(value: value)
        None -> break  // Channel closed
    }
}

Match Buffer Size to Usage

// Streaming large data: smaller buffer to limit memory
let (p, c) = channel<LargeData>(buffer: 5);

// Many small messages: larger buffer for throughput
let (p, c) = channel<SmallMessage>(buffer: 1000);

Use the Right Channel Type

// Single producer, single consumer
let (p, c) = channel<T>(buffer: n);

// Multiple producers, single consumer (aggregation)
let (p, c) = channel_in<T>(buffer: n);

// Single producer, multiple consumers (distribution)
let (p, c) = channel_out<T>(buffer: n);

// Multiple of both (complex patterns)
let (p, c) = channel_all<T>(buffer: n);

Quick Reference

Channel Constructors

// One-to-one
channel<T>(buffer: n) -> (Producer<T>, Consumer<T>);

// Fan-in (many-to-one)
channel_in<T>(buffer: n) -> (CloneableProducer<T>, Consumer<T>);

// Fan-out (one-to-many)
channel_out<T>(buffer: n) -> (Producer<T>, CloneableConsumer<T>);

// Many-to-many
channel_all<T>(buffer: n) -> (CloneableProducer<T>, CloneableConsumer<T>);

Producer Methods

producer.send(value: v) -> void;
producer.close() -> void;
producer.is_closed() -> bool;

Consumer Methods

consumer.receive() -> Option<T>;
consumer.is_closed() -> bool;
// Also iterable: for item in consumer do ...

Channel Types

TypeProducerConsumerUse Case
channelSingleSingleSimple communication
channel_inMultipleSingleAggregation
channel_outSingleMultipleDistribution
channel_allMultipleMultipleComplex patterns

What’s Next

Now that you understand channels:

  • Traits — Shared behavior definitions
  • Iterators — Functional data processing