Appam
API ReferenceConsumers

ChannelConsumer

Stream consumer that sends events through a tokio unbounded mpsc channel.

Overview

ChannelConsumer forwards every StreamEvent through a tokio::sync::mpsc::UnboundedSender. This bridges agent execution with other async tasks -- web socket handlers, SSE response writers, UI update loops, or any channel-based architecture.

Import path: appam::agent::consumers::ChannelConsumer

Definition

pub struct ChannelConsumer {
    tx: mpsc::UnboundedSender<StreamEvent>,
}

Constructor

ChannelConsumer::new

pub fn new(tx: mpsc::UnboundedSender<StreamEvent>) -> Self

Creates a consumer that sends events to the provided channel. Events are cloned before sending.

ParameterTypeDescription
txmpsc::UnboundedSender<StreamEvent>The sending half of a tokio unbounded mpsc channel.

Behavior

  • Each event is cloned and sent through the channel via tx.send(event.clone()).
  • If the receiver has been dropped (channel closed), sends fail silently -- this is not treated as a critical error.
  • The consumer never blocks the agent's execution loop.

Usage

Basic channel forwarding

use appam::prelude::*;
use appam::agent::consumers::ChannelConsumer;
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::unbounded_channel();
let consumer = ChannelConsumer::new(tx);

// Process events in another task
tokio::spawn(async move {
    while let Some(event) = rx.recv().await {
        match event {
            StreamEvent::Content { content } => print!("{}", content),
            StreamEvent::Done => break,
            _ => {}
        }
    }
});

let agent = Agent::quick("anthropic/claude-sonnet-4-5", "You are helpful.", vec![])?;
agent.run_streaming("Hello!", Box::new(consumer)).await?;

WebSocket bridge

Forward events to a WebSocket connection by serializing them to JSON:

use appam::agent::consumers::ChannelConsumer;
use appam::agent::streaming::StreamEvent;
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::unbounded_channel();
let consumer = ChannelConsumer::new(tx);

// WebSocket writer task
tokio::spawn(async move {
    while let Some(event) = rx.recv().await {
        let json = serde_json::to_string(&event).unwrap();
        // ws_sink.send(Message::Text(json)).await?;
    }
});

Combined with ConsoleConsumer

Use MultiConsumer to display output in the terminal and forward to a channel simultaneously:

use appam::agent::streaming::MultiConsumer;
use appam::agent::consumers::{ConsoleConsumer, ChannelConsumer};
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::unbounded_channel();

let multi = MultiConsumer::new()
    .add(Box::new(ConsoleConsumer::new()))
    .add(Box::new(ChannelConsumer::new(tx)));

// Receiver task processes events for metrics, logging, etc.
tokio::spawn(async move {
    while let Some(event) = rx.recv().await {
        // Record metrics, update UI, etc.
    }
});

agent.run_streaming("Hello!", Box::new(multi)).await?;

Channel Type

ChannelConsumer uses mpsc::UnboundedSender rather than a bounded channel. This ensures that the agent's event delivery never blocks even if the receiver is slow. If backpressure is needed, consume events from the receiver and apply your own buffering strategy.