Streaming Guide
Implement real-time streaming with closure-based and consumer-based APIs.
Appam streams agent output in real time through an event-based system. You can handle events with inline closures for simple cases or implement the StreamConsumer trait for reusable, composable handlers.
Two Streaming Approaches
- StreamBuilder (closure-based) -- Inline event handlers with a fluent API. Best for applications, scripts, and CLI tools.
- StreamConsumer trait -- Reusable event handler objects. Best for libraries, middleware, and multi-destination routing.
StreamBuilder API
Call .stream() on an agent to get a StreamBuilder, then chain event handlers and finish with .run():
use appam::prelude::*;
let agent = Agent::quick(
"anthropic/claude-sonnet-4-5",
"You are a helpful assistant.",
vec![],
)?;
let session = agent
.stream("Explain the Rust borrow checker")
.on_content(|text| {
print!("{}", text);
std::io::Write::flush(&mut std::io::stdout()).ok();
})
.on_reasoning(|text| {
// Reasoning tokens from extended thinking models
eprint!("\x1b[36m{}\x1b[0m", text); // Cyan
})
.on_session_started(|id| {
println!("Session: {}", id);
})
.on_tool_call(|name, args| {
println!("\n[Calling: {}]", name);
})
.on_tool_result(|name, result| {
println!("[{} returned: {}]", name, result);
})
.on_tool_failed(|name, error| {
eprintln!("[{} failed: {}]", name, error);
})
.on_error(|err| {
eprintln!("Error: {}", err);
})
.on_done(|| {
println!("\nDone!");
})
.run()
.await?;
println!("Session ID: {}", session.session_id);Available Handlers
| Handler | Signature | Called When |
|---|---|---|
on_content | Fn(&str) | LLM generates a text chunk |
on_reasoning | Fn(&str) | LLM emits a reasoning/thinking token |
on_session_started | Fn(&str) | Session is created with a unique ID |
on_tool_call | Fn(&str, &str) | LLM initiates a tool call (name, args JSON) |
on_tool_result | Fn(&str, &Value) | Tool completes successfully (name, result) |
on_tool_failed | Fn(&str, &str) | Tool execution fails (name, error message) |
on_error | Fn(&str) | Unrecoverable error during execution |
on_done | Fn() | Agent finishes processing |
All handlers are Fn + Send + Sync + 'static. They execute synchronously within the event loop.
Async Handlers
For operations that need async (database writes, API calls, etc.), use the async variants. These spawn a tokio task for each invocation:
agent
.stream("Do something")
.on_tool_call_async(|name, args| async move {
// Async database write
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
println!("Logged tool call: {} with {}", name, args);
Ok(())
})
.on_tool_result_async(|name, result| async move {
// Async API call
println!("Stored result from {}: {:?}", name, result);
Ok(())
})
.run()
.await?;Async handlers take owned String and Value parameters (not references) since they outlive the event dispatch.
The StreamConsumer Trait
For reusable event handling, implement StreamConsumer:
use appam::prelude::*;
struct MetricsConsumer {
tool_calls: std::sync::atomic::AtomicUsize,
}
impl StreamConsumer for MetricsConsumer {
fn on_event(&self, event: &StreamEvent) -> Result<()> {
match event {
StreamEvent::Content { content } => {
print!("{}", content);
}
StreamEvent::ToolCallStarted { tool_name, .. } => {
self.tool_calls
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
println!("\n[Tool: {}]", tool_name);
}
StreamEvent::Done => {
let count = self.tool_calls.load(std::sync::atomic::Ordering::Relaxed);
println!("\nCompleted with {} tool calls", count);
}
_ => {}
}
Ok(())
}
}Use it with run_streaming:
let consumer = MetricsConsumer {
tool_calls: std::sync::atomic::AtomicUsize::new(0),
};
agent.run_streaming("Hello!", Box::new(consumer)).await?;StreamEvent Variants
The StreamEvent enum covers all observable agent activity:
pub enum StreamEvent {
SessionStarted { session_id: String },
Content { content: String },
Reasoning { content: String },
ToolCallStarted { tool_name: String, arguments: String },
ToolCallCompleted { tool_name: String, result: Value, success: bool, duration_ms: f64 },
ToolCallFailed { tool_name: String, error: String },
TurnCompleted,
UsageUpdate { snapshot: AggregatedUsage },
Done,
Error { message: String },
}Events are serializable (Serialize + Deserialize) with #[serde(tag = "type")], making them suitable for JSON transmission over SSE or WebSockets.
Built-in Consumers
Appam ships with several ready-to-use consumers:
ConsoleConsumer
Pretty-printed terminal output with optional ANSI colors:
use appam::agent::consumers::ConsoleConsumer;
let consumer = ConsoleConsumer::new()
.with_colors(true)
.with_reasoning(true)
.with_tool_details(true);This is the default consumer used by agent.run().
ChannelConsumer
Forward events to a tokio mpsc channel for async processing:
use appam::agent::consumers::ChannelConsumer;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let consumer = ChannelConsumer::new(tx);
// Process events in another task
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
// Handle event
}
});
agent.run_streaming("Hello!", Box::new(consumer)).await?;CallbackConsumer
Execute a single closure for every event:
use appam::agent::consumers::CallbackConsumer;
let consumer = CallbackConsumer::new(|event| {
println!("Event: {:?}", event);
Ok(())
});TraceConsumer
Write structured JSONL trace files for debugging and analysis:
use appam::agent::consumers::TraceConsumer;
use appam::config::TraceFormat;
use std::path::Path;
let trace = TraceConsumer::new(
Path::new("logs"),
"session-abc123",
TraceFormat::Detailed, // or TraceFormat::Compact
)?;Each event is written as a JSON line with timestamp, elapsed time, and event data.
SqliteTraceConsumer
Persist events to a SQLite database for querying and long-term storage:
use appam::agent::consumers::SqliteTraceConsumer;
use std::sync::Arc;
let trace = SqliteTraceConsumer::new(
Arc::new(pool),
"session-abc123".to_string(),
"my-agent".to_string(),
"claude-sonnet-4-5".to_string(),
"default".to_string(),
1,
);Consecutive content and reasoning events are automatically consolidated into single database entries for efficiency.
MultiConsumer
Broadcast events to multiple consumers simultaneously using MultiConsumer or the run_with_consumers method:
use appam::prelude::*;
use appam::agent::streaming::MultiConsumer;
// Option 1: Manual MultiConsumer
let multi = MultiConsumer::new()
.add(Box::new(ConsoleConsumer::new()))
.add(Box::new(ChannelConsumer::new(tx)));
agent.run_streaming("Hello!", Box::new(multi)).await?;
// Option 2: run_with_consumers convenience method
agent.run_with_consumers("Hello!", vec![
Box::new(ConsoleConsumer::new()),
Box::new(ChannelConsumer::new(tx)),
]).await?;Consumers receive events in the order they are added. If any consumer returns an error, event propagation stops and the error is returned.
Error Handling in Consumers
Returning Err(...) from on_event stops the agent execution with that error. For non-critical failures (logging, metrics), handle errors internally and return Ok(()):
impl StreamConsumer for ResilientConsumer {
fn on_event(&self, event: &StreamEvent) -> Result<()> {
if let Err(e) = self.try_process(event) {
// Log but don't stop the agent
eprintln!("Warning: consumer error: {}", e);
}
Ok(())
}
}