StreamConsumer
Trait for handling streaming events from agents.
Overview
StreamConsumer is the trait that defines how streaming events are handled. Implement it to build custom event processors -- loggers, UI renderers, metrics collectors, or anything else that reacts to agent activity in real time.
Import path: appam::agent::streaming::StreamConsumer
Definition
pub trait StreamConsumer: Send + Sync {
fn on_event(&self, event: &StreamEvent) -> Result<()>;
}on_event
fn on_event(&self, event: &StreamEvent) -> Result<()>;Called for each event emitted by the agent during execution. Implementations should be fast and non-blocking; offload heavy work to background tasks or channels.
Parameters:
| Parameter | Type | Description |
|---|---|---|
event | &StreamEvent | Reference to the event being emitted. |
Returns: Result<()> -- returning an error stops event propagation and terminates agent execution with that error.
Thread Safety
Consumers must be Send + Sync because:
- The agent runtime may call
on_eventfrom different async tasks. - Multiple consumers may run concurrently when using
MultiConsumer. - The consumer is passed as
Box<dyn StreamConsumer>across thread boundaries.
Use Arc<Mutex<T>> or atomic types for interior mutability.
Implementation Example
use appam::agent::streaming::{StreamConsumer, StreamEvent};
use anyhow::Result;
use std::sync::{Arc, Mutex};
/// Collects all content chunks into a single string.
struct ContentCollector {
buffer: Arc<Mutex<String>>,
}
impl ContentCollector {
fn new() -> Self {
Self {
buffer: Arc::new(Mutex::new(String::new())),
}
}
fn get_content(&self) -> String {
self.buffer.lock().unwrap().clone()
}
}
impl StreamConsumer for ContentCollector {
fn on_event(&self, event: &StreamEvent) -> Result<()> {
match event {
StreamEvent::Content { content } => {
let mut buf = self.buffer.lock().unwrap();
buf.push_str(content);
}
StreamEvent::Error { message } => {
eprintln!("Error: {}", message);
}
_ => {}
}
Ok(())
}
}Error Handling
Returning an error from on_event has two effects:
- Stops event propagation -- no further consumers receive the event (relevant for
MultiConsumer). - Terminates the agent -- the error propagates up through the agent's run loop.
For non-critical failures (e.g., a metrics endpoint being temporarily unavailable), log the error and return Ok(()) to allow streaming to continue:
impl StreamConsumer for MetricsConsumer {
fn on_event(&self, event: &StreamEvent) -> Result<()> {
if let Err(e) = self.try_record(event) {
tracing::warn!("Failed to record metric: {}", e);
// Don't propagate -- metrics are not critical
}
Ok(())
}
}MultiConsumer
MultiConsumer broadcasts events to multiple consumers in order. If any consumer returns an error, propagation stops.
use appam::agent::streaming::MultiConsumer;
use appam::agent::consumers::{ConsoleConsumer, ChannelConsumer};
let multi = MultiConsumer::new()
.add(Box::new(ConsoleConsumer::new()))
.add(Box::new(ChannelConsumer::new(tx)));Definition
pub struct MultiConsumer {
consumers: Vec<Box<dyn StreamConsumer>>,
}
impl MultiConsumer {
pub fn new() -> Self;
pub fn add(mut self, consumer: Box<dyn StreamConsumer>) -> Self;
}| Method | Description |
|---|---|
new() | Create an empty multi-consumer. |
add(consumer) | Add a consumer to the broadcast list. Consumers are invoked in the order they are added. Returns self for chaining. |
MultiConsumer itself implements StreamConsumer, so it can be passed anywhere a single consumer is expected:
agent.run_streaming("prompt", Box::new(multi)).await?;Using Consumers with Agents
With run_streaming
Pass a single consumer (or a MultiConsumer) to run_streaming:
use appam::prelude::*;
use appam::agent::consumers::ConsoleConsumer;
let agent = Agent::quick("anthropic/claude-sonnet-4-5", "You are helpful.", vec![])?;
let session = agent.run_streaming("Hello!", Box::new(ConsoleConsumer::new())).await?;With run_with_consumers
Pass a Vec<Box<dyn StreamConsumer>> to broadcast events to all consumers:
use appam::prelude::*;
use appam::agent::consumers::{ConsoleConsumer, TraceConsumer};
use appam::config::TraceFormat;
use std::path::Path;
let agent = Agent::quick("anthropic/claude-sonnet-4-5", "You are helpful.", vec![])?;
let trace = TraceConsumer::new(Path::new("logs"), "session-1", TraceFormat::Detailed)?;
let session = agent.run_with_consumers(
"Hello!",
vec![
Box::new(ConsoleConsumer::new()),
Box::new(trace),
],
).await?;StreamConsumer vs StreamBuilder
StreamConsumer | StreamBuilder | |
|---|---|---|
| Approach | Implement a trait with pattern matching | Chain closures on a builder |
| Reusability | Consumers are reusable structs | Closures are typically inline |
| Composition | Combine with MultiConsumer | Single builder per call |
| Async support | Sync on_event (spawn tasks internally) | Sync and async handler variants |
| Use when | Building reusable infrastructure, multiple destinations | Quick prototyping, simple one-off handlers |
See StreamBuilder for the closure-based alternative.