Streaming
Event-based streaming with pluggable consumers and the StreamBuilder API.
Overview
Appam streams all LLM responses in real time. As the model generates text, invokes tools, or completes its turn, the runtime emits structured StreamEvent values. You decide how to handle these events -- print to console, forward to a web client, log to a database, or any combination.
StreamEvent
Every observable action during agent execution is represented as a StreamEvent:
pub enum StreamEvent {
SessionStarted { session_id: String },
Content { content: String },
Reasoning { content: String },
ToolCallStarted { tool_name: String, arguments: String },
ToolCallCompleted { tool_name: String, result: Value, success: bool, duration_ms: f64 },
ToolCallFailed { tool_name: String, error: String },
TurnCompleted,
UsageUpdate { snapshot: AggregatedUsage },
Done,
Error { message: String },
}Event Lifecycle
A typical agent interaction produces events in this order:
SessionStarted-- New session created with a unique IDContent(repeated) -- Text chunks as the model generates outputReasoning(optional, repeated) -- Thinking tokens from models with extended reasoningToolCallStarted-- Model decided to invoke a toolToolCallCompletedorToolCallFailed-- Tool execution resultTurnCompleted-- One LLM turn finished (may loop back to step 2 with tool results)UsageUpdate-- Cumulative token counts and cost estimateDone-- Agent loop completed, no more events
If an unrecoverable error occurs at any point, an Error event is emitted instead.
The StreamBuilder API
The most ergonomic way to handle streaming events is the closure-based StreamBuilder, accessed via agent.stream():
use appam::prelude::*;
let agent = Agent::quick(
"anthropic/claude-sonnet-4-5",
"You are a helpful assistant.",
vec![],
)?;
let session = agent
.stream("Explain how async works in Rust")
.on_content(|text| {
print!("{}", text);
std::io::Write::flush(&mut std::io::stdout()).ok();
})
.on_reasoning(|thinking| {
eprint!("\x1b[36m{}\x1b[0m", thinking);
})
.on_tool_call(|name, args| {
println!("\n[calling {}]", name);
})
.on_tool_result(|name, result| {
println!("[{} completed]", name);
})
.on_tool_failed(|name, error| {
eprintln!("[{} failed: {}]", name, error);
})
.on_error(|err| {
eprintln!("Error: {}", err);
})
.on_done(|| {
println!("\nDone.");
})
.on_session_started(|id| {
println!("Session: {}", id);
})
.run()
.await?;All handlers are optional -- register only the ones you need. The builder returns a Session on completion.
Available Handlers
| Handler | Signature | Called When |
|---|---|---|
on_content | Fn(&str) | Each text chunk from the LLM |
on_reasoning | Fn(&str) | Each thinking/reasoning chunk |
on_tool_call | Fn(&str, &str) | Tool invocation starts (name, args) |
on_tool_result | Fn(&str, &Value) | Tool completes successfully (name, result) |
on_tool_failed | Fn(&str, &str) | Tool execution fails (name, error) |
on_error | Fn(&str) | Unrecoverable error occurs |
on_done | Fn() | Agent loop finishes |
on_session_started | Fn(&str) | Session created (session_id) |
Async Handlers
For handlers that need to perform async operations (database writes, API calls), use the async variants:
agent
.stream("Do something")
.on_tool_call_async(|name, args| async move {
// Async database logging
db.log_tool_call(&name, &args).await?;
Ok(())
})
.on_tool_result_async(|name, result| async move {
// Async result processing
db.save_result(&name, &result).await?;
Ok(())
})
.run()
.await?;Async handlers are spawned as Tokio tasks and run concurrently with the main stream processing.
StreamConsumer Trait
For reusable event handling logic, implement the StreamConsumer trait:
use appam::agent::streaming::{StreamConsumer, StreamEvent};
use anyhow::Result;
struct MetricsConsumer {
tool_calls: std::sync::atomic::AtomicU64,
}
impl StreamConsumer for MetricsConsumer {
fn on_event(&self, event: &StreamEvent) -> Result<()> {
match event {
StreamEvent::ToolCallCompleted { tool_name, duration_ms, .. } => {
self.tool_calls.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
println!("[metrics] {} took {:.1}ms", tool_name, duration_ms);
}
_ => {}
}
Ok(())
}
}Consumers must be Send + Sync. Returning an error from on_event stops the agent execution.
Use consumers with run_streaming():
let consumer = MetricsConsumer { tool_calls: AtomicU64::new(0) };
let session = agent
.run_streaming("Hello!", Box::new(consumer))
.await?;Built-in Consumers
Appam ships with several ready-to-use consumers:
ConsoleConsumer
Pretty-prints events to the terminal with color and formatting:
use appam::agent::consumers::ConsoleConsumer;
let consumer = ConsoleConsumer::new()
.with_colors(true)
.with_reasoning(true);This is the default consumer used by agent.run().
ChannelConsumer
Forwards events to a Tokio mpsc channel for decoupled processing:
use appam::agent::consumers::ChannelConsumer;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let consumer = ChannelConsumer::new(tx);
// Process events in another task
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
// Handle event
}
});
agent.run_streaming("Hello!", Box::new(consumer)).await?;CallbackConsumer
Wraps a closure as a consumer:
use appam::agent::consumers::CallbackConsumer;
let consumer = CallbackConsumer::new(|event| {
println!("{:?}", event);
Ok(())
});TraceConsumer
Writes events to session trace files for debugging and replay:
use appam::agent::consumers::TraceConsumer;
use appam::config::TraceFormat;
use std::path::Path;
let consumer = TraceConsumer::new(
Path::new("logs"),
"session-123",
TraceFormat::Detailed,
)?;SqliteTraceConsumer
Persists events to a SQLite database for structured querying. This constructor is
lower-level than the other built-in consumers because it expects an existing
SqlitePool plus session metadata:
use appam::agent::consumers::SqliteTraceConsumer;
use sqlx::SqlitePool;
use std::sync::Arc;
let pool = Arc::new(SqlitePool::connect("sqlite:traces.db").await?);
let consumer = SqliteTraceConsumer::new(
pool,
"session-123".to_string(),
"my-agent".to_string(),
"claude-sonnet-4-5".to_string(),
"chat".to_string(),
1,
);MultiConsumer
Broadcast events to multiple consumers simultaneously using MultiConsumer:
use appam::agent::streaming::MultiConsumer;
use appam::agent::consumers::*;
let multi = MultiConsumer::new()
.add(Box::new(ConsoleConsumer::new()))
.add(Box::new(ChannelConsumer::new(tx)))
.add(Box::new(TraceConsumer::new(
std::path::Path::new("logs"),
"session-123",
appam::config::TraceFormat::Detailed,
)?));
agent.run_streaming("Hello!", Box::new(multi)).await?;Or use run_with_consumers() for a more concise API:
agent.run_with_consumers("Hello!", vec![
Box::new(ConsoleConsumer::new()),
Box::new(ChannelConsumer::new(tx)),
]).await?;Events are delivered to consumers in order. If any consumer returns an error, propagation stops and the error is returned to the caller.