StreamBuilder
Closure-based fluent API for handling streaming events.
Overview
StreamBuilder provides an ergonomic, closure-based API for handling streaming events without implementing the StreamConsumer trait. It is returned by agent.stream("prompt") on RuntimeAgent and uses a fluent builder pattern where you chain .on_* handlers and finalize with .run().
Import path: appam::agent::streaming_builder::StreamBuilder
Basic Usage
use appam::prelude::*;
let agent = Agent::quick("anthropic/claude-sonnet-4-5", "You are helpful.", vec![])?;
let session = agent
.stream("Explain ownership in Rust")
.on_content(|text| print!("{}", text))
.run()
.await?;
println!("\nSession: {}", session.session_id);All handlers are optional. Attach only the ones you need -- unhandled events are silently ignored.
Definition
pub struct StreamBuilder<'a> {
agent: &'a RuntimeAgent,
message: String,
// ... handler fields
}Created via RuntimeAgent::stream():
impl RuntimeAgent {
pub fn stream(&self, message: impl Into<String>) -> StreamBuilder<'_>;
}Handler Methods
Every handler method consumes and returns self, enabling method chaining.
.on_content
pub fn on_content<F>(self, f: F) -> Self
where
F: Fn(&str) + Send + Sync + 'static;Called for each Content chunk from the LLM. The closure receives the text content as a &str.
.on_content(|text| {
print!("{}", text);
std::io::Write::flush(&mut std::io::stdout()).ok();
}).on_reasoning
pub fn on_reasoning<F>(self, f: F) -> Self
where
F: Fn(&str) + Send + Sync + 'static;Called for each Reasoning chunk from models with extended thinking enabled. Receives the reasoning text as &str.
.on_reasoning(|thinking| {
// Display thinking in cyan
print!("\x1b[36m{}\x1b[0m", thinking);
}).on_session_started
pub fn on_session_started<F>(self, f: F) -> Self
where
F: Fn(&str) + Send + Sync + 'static;Called once when the session begins. Receives the session ID as &str.
.on_session_started(|session_id| {
println!("Session: {}", session_id);
}).on_tool_call
pub fn on_tool_call<F>(self, f: F) -> Self
where
F: Fn(&str, &str) + Send + Sync + 'static;Called when the model initiates a tool call. Receives (tool_name, arguments) where arguments is a JSON string.
.on_tool_call(|name, args| {
println!("[Calling: {} with {}]", name, args);
}).on_tool_result
pub fn on_tool_result<F>(self, f: F) -> Self
where
F: Fn(&str, &serde_json::Value) + Send + Sync + 'static;Called when a tool completes successfully. Receives (tool_name, result) where result is a &serde_json::Value.
.on_tool_result(|name, result| {
println!("[{} returned: {}]", name, result);
}).on_tool_failed
pub fn on_tool_failed<F>(self, f: F) -> Self
where
F: Fn(&str, &str) + Send + Sync + 'static;Called when a tool execution fails. Receives (tool_name, error_message).
.on_tool_failed(|name, error| {
eprintln!("[{} failed: {}]", name, error);
}).on_error
pub fn on_error<F>(self, f: F) -> Self
where
F: Fn(&str) + Send + Sync + 'static;Called when a streaming error occurs. Receives the error message as &str.
.on_error(|error| {
eprintln!("Error: {}", error);
}).on_done
pub fn on_done<F>(self, f: F) -> Self
where
F: Fn() + Send + Sync + 'static;Called when the stream completes successfully. Takes no arguments.
.on_done(|| {
println!("\nStream complete.");
}).on_tool_call_async
pub fn on_tool_call_async<F, Fut>(self, f: F) -> Self
where
F: Fn(String, String) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + 'static;Async version of .on_tool_call. The closure receives owned (String, String) values and returns a future. The future is spawned as a tokio task, so it runs concurrently without blocking the stream.
.on_tool_call_async(|name, args| async move {
// Async database write
db.log_tool_call(&name, &args).await?;
Ok(())
}).on_tool_result_async
pub fn on_tool_result_async<F, Fut>(self, f: F) -> Self
where
F: Fn(String, serde_json::Value) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + 'static;Async version of .on_tool_result. The closure receives owned (String, serde_json::Value) values. The future is spawned as a tokio task.
.on_tool_result_async(|name, result| async move {
// Forward to an external API
api_client.report_result(&name, &result).await?;
Ok(())
}).run
pub async fn run(self) -> Result<Session>;Consumes the builder, starts the agent loop with all configured handlers, and returns the completed Session. This is always the final call in the chain.
Full Example
use appam::prelude::*;
use appam::llm::anthropic::ThinkingConfig;
use std::io::Write;
#[tokio::main]
async fn main() -> Result<()> {
let agent = AgentBuilder::new("demo")
.provider(LlmProvider::Anthropic)
.model("claude-sonnet-4-5")
.system_prompt("You are a helpful coding assistant with tools.")
.thinking(ThinkingConfig::enabled(10000))
.build()?;
let session = agent
.stream("Read the main.rs file and summarize it")
.on_session_started(|id| {
println!("Session: {}", id);
})
.on_reasoning(|thinking| {
eprint!("\x1b[36m{}\x1b[0m", thinking);
})
.on_content(|text| {
print!("{}", text);
std::io::stdout().flush().ok();
})
.on_tool_call(|name, args| {
println!("\n[Calling: {}]", name);
})
.on_tool_result(|name, result| {
println!("[{}: done]", name);
})
.on_tool_failed(|name, error| {
eprintln!("[{} failed: {}]", name, error);
})
.on_error(|error| {
eprintln!("Error: {}", error);
})
.on_done(|| {
println!("\n---");
})
.run()
.await?;
if let Some(usage) = &session.usage {
println!("Tokens: {} | Cost: ${:.4}",
usage.total_tokens(), usage.total_cost_usd);
}
Ok(())
}How It Works Internally
StreamBuilder creates an internal ClosureConsumer struct that implements StreamConsumer. When .run() is called, it passes this consumer to RuntimeAgent::run_streaming(). Each on_event call dispatches to the matching closure (if set) via pattern matching on StreamEvent.
Async handlers (.on_tool_call_async, .on_tool_result_async) are spawned as independent tokio tasks via tokio::spawn, so they run concurrently without blocking event delivery. Errors from async handlers are logged to stderr but do not stop the stream.
StreamBuilder vs StreamConsumer
StreamBuilder | StreamConsumer | |
|---|---|---|
| Syntax | Closure chaining | Trait implementation |
| Boilerplate | Minimal -- attach only the events you care about | Must handle all variants (with _ => {} catch-all) |
| Async handlers | Built-in .on_*_async variants | Must spawn tasks manually in on_event |
| Reusability | Inline, per-call | Reusable struct, composable via MultiConsumer |
| Access to result | .run() returns Session | Session returned by run_streaming() |
Use StreamBuilder for quick, inline streaming. Use StreamConsumer when you need reusable consumers or want to broadcast to multiple destinations.
See StreamConsumer for the trait-based approach.