ChannelConsumer
Stream consumer that sends events through a tokio unbounded mpsc channel.
Overview
ChannelConsumer forwards every StreamEvent through a tokio::sync::mpsc::UnboundedSender. This bridges agent execution with other async tasks -- web socket handlers, SSE response writers, UI update loops, or any channel-based architecture.
Import path: appam::agent::consumers::ChannelConsumer
Definition
pub struct ChannelConsumer {
tx: mpsc::UnboundedSender<StreamEvent>,
}Constructor
ChannelConsumer::new
pub fn new(tx: mpsc::UnboundedSender<StreamEvent>) -> SelfCreates a consumer that sends events to the provided channel. Events are cloned before sending.
| Parameter | Type | Description |
|---|---|---|
tx | mpsc::UnboundedSender<StreamEvent> | The sending half of a tokio unbounded mpsc channel. |
Behavior
- Each event is cloned and sent through the channel via
tx.send(event.clone()). - If the receiver has been dropped (channel closed), sends fail silently -- this is not treated as a critical error.
- The consumer never blocks the agent's execution loop.
Usage
Basic channel forwarding
use appam::prelude::*;
use appam::agent::consumers::ChannelConsumer;
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::unbounded_channel();
let consumer = ChannelConsumer::new(tx);
// Process events in another task
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
match event {
StreamEvent::Content { content } => print!("{}", content),
StreamEvent::Done => break,
_ => {}
}
}
});
let agent = Agent::quick("anthropic/claude-sonnet-4-5", "You are helpful.", vec![])?;
agent.run_streaming("Hello!", Box::new(consumer)).await?;WebSocket bridge
Forward events to a WebSocket connection by serializing them to JSON:
use appam::agent::consumers::ChannelConsumer;
use appam::agent::streaming::StreamEvent;
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::unbounded_channel();
let consumer = ChannelConsumer::new(tx);
// WebSocket writer task
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
let json = serde_json::to_string(&event).unwrap();
// ws_sink.send(Message::Text(json)).await?;
}
});Combined with ConsoleConsumer
Use MultiConsumer to display output in the terminal and forward to a channel simultaneously:
use appam::agent::streaming::MultiConsumer;
use appam::agent::consumers::{ConsoleConsumer, ChannelConsumer};
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::unbounded_channel();
let multi = MultiConsumer::new()
.add(Box::new(ConsoleConsumer::new()))
.add(Box::new(ChannelConsumer::new(tx)));
// Receiver task processes events for metrics, logging, etc.
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
// Record metrics, update UI, etc.
}
});
agent.run_streaming("Hello!", Box::new(multi)).await?;Channel Type
ChannelConsumer uses mpsc::UnboundedSender rather than a bounded channel. This ensures that the agent's event delivery never blocks even if the receiver is slow. If backpressure is needed, consume events from the receiver and apply your own buffering strategy.