Appam
API Reference

StreamBuilder

Closure-based fluent API for handling streaming events.

Overview

StreamBuilder provides an ergonomic, closure-based API for handling streaming events without implementing the StreamConsumer trait. It is returned by agent.stream("prompt") on RuntimeAgent and uses a fluent builder pattern where you chain .on_* handlers and finalize with .run().

Import path: appam::agent::streaming_builder::StreamBuilder

Basic Usage

use appam::prelude::*;

let agent = Agent::quick("anthropic/claude-sonnet-4-5", "You are helpful.", vec![])?;

let session = agent
    .stream("Explain ownership in Rust")
    .on_content(|text| print!("{}", text))
    .run()
    .await?;

println!("\nSession: {}", session.session_id);

All handlers are optional. Attach only the ones you need -- unhandled events are silently ignored.

Definition

pub struct StreamBuilder<'a> {
    agent: &'a RuntimeAgent,
    message: String,
    // ... handler fields
}

Created via RuntimeAgent::stream():

impl RuntimeAgent {
    pub fn stream(&self, message: impl Into<String>) -> StreamBuilder<'_>;
}

Handler Methods

Every handler method consumes and returns self, enabling method chaining.

.on_content

pub fn on_content<F>(self, f: F) -> Self
where
    F: Fn(&str) + Send + Sync + 'static;

Called for each Content chunk from the LLM. The closure receives the text content as a &str.

.on_content(|text| {
    print!("{}", text);
    std::io::Write::flush(&mut std::io::stdout()).ok();
})

.on_reasoning

pub fn on_reasoning<F>(self, f: F) -> Self
where
    F: Fn(&str) + Send + Sync + 'static;

Called for each Reasoning chunk from models with extended thinking enabled. Receives the reasoning text as &str.

.on_reasoning(|thinking| {
    // Display thinking in cyan
    print!("\x1b[36m{}\x1b[0m", thinking);
})

.on_session_started

pub fn on_session_started<F>(self, f: F) -> Self
where
    F: Fn(&str) + Send + Sync + 'static;

Called once when the session begins. Receives the session ID as &str.

.on_session_started(|session_id| {
    println!("Session: {}", session_id);
})

.on_tool_call

pub fn on_tool_call<F>(self, f: F) -> Self
where
    F: Fn(&str, &str) + Send + Sync + 'static;

Called when the model initiates a tool call. Receives (tool_name, arguments) where arguments is a JSON string.

.on_tool_call(|name, args| {
    println!("[Calling: {} with {}]", name, args);
})

.on_tool_result

pub fn on_tool_result<F>(self, f: F) -> Self
where
    F: Fn(&str, &serde_json::Value) + Send + Sync + 'static;

Called when a tool completes successfully. Receives (tool_name, result) where result is a &serde_json::Value.

.on_tool_result(|name, result| {
    println!("[{} returned: {}]", name, result);
})

.on_tool_failed

pub fn on_tool_failed<F>(self, f: F) -> Self
where
    F: Fn(&str, &str) + Send + Sync + 'static;

Called when a tool execution fails. Receives (tool_name, error_message).

.on_tool_failed(|name, error| {
    eprintln!("[{} failed: {}]", name, error);
})

.on_error

pub fn on_error<F>(self, f: F) -> Self
where
    F: Fn(&str) + Send + Sync + 'static;

Called when a streaming error occurs. Receives the error message as &str.

.on_error(|error| {
    eprintln!("Error: {}", error);
})

.on_done

pub fn on_done<F>(self, f: F) -> Self
where
    F: Fn() + Send + Sync + 'static;

Called when the stream completes successfully. Takes no arguments.

.on_done(|| {
    println!("\nStream complete.");
})

.on_tool_call_async

pub fn on_tool_call_async<F, Fut>(self, f: F) -> Self
where
    F: Fn(String, String) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Result<()>> + Send + 'static;

Async version of .on_tool_call. The closure receives owned (String, String) values and returns a future. The future is spawned as a tokio task, so it runs concurrently without blocking the stream.

.on_tool_call_async(|name, args| async move {
    // Async database write
    db.log_tool_call(&name, &args).await?;
    Ok(())
})

.on_tool_result_async

pub fn on_tool_result_async<F, Fut>(self, f: F) -> Self
where
    F: Fn(String, serde_json::Value) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Result<()>> + Send + 'static;

Async version of .on_tool_result. The closure receives owned (String, serde_json::Value) values. The future is spawned as a tokio task.

.on_tool_result_async(|name, result| async move {
    // Forward to an external API
    api_client.report_result(&name, &result).await?;
    Ok(())
})

.run

pub async fn run(self) -> Result<Session>;

Consumes the builder, starts the agent loop with all configured handlers, and returns the completed Session. This is always the final call in the chain.

Full Example

use appam::prelude::*;
use appam::llm::anthropic::ThinkingConfig;
use std::io::Write;

#[tokio::main]
async fn main() -> Result<()> {
    let agent = AgentBuilder::new("demo")
        .provider(LlmProvider::Anthropic)
        .model("claude-sonnet-4-5")
        .system_prompt("You are a helpful coding assistant with tools.")
        .thinking(ThinkingConfig::enabled(10000))
        .build()?;

    let session = agent
        .stream("Read the main.rs file and summarize it")
        .on_session_started(|id| {
            println!("Session: {}", id);
        })
        .on_reasoning(|thinking| {
            eprint!("\x1b[36m{}\x1b[0m", thinking);
        })
        .on_content(|text| {
            print!("{}", text);
            std::io::stdout().flush().ok();
        })
        .on_tool_call(|name, args| {
            println!("\n[Calling: {}]", name);
        })
        .on_tool_result(|name, result| {
            println!("[{}: done]", name);
        })
        .on_tool_failed(|name, error| {
            eprintln!("[{} failed: {}]", name, error);
        })
        .on_error(|error| {
            eprintln!("Error: {}", error);
        })
        .on_done(|| {
            println!("\n---");
        })
        .run()
        .await?;

    if let Some(usage) = &session.usage {
        println!("Tokens: {} | Cost: ${:.4}",
            usage.total_tokens(), usage.total_cost_usd);
    }

    Ok(())
}

How It Works Internally

StreamBuilder creates an internal ClosureConsumer struct that implements StreamConsumer. When .run() is called, it passes this consumer to RuntimeAgent::run_streaming(). Each on_event call dispatches to the matching closure (if set) via pattern matching on StreamEvent.

Async handlers (.on_tool_call_async, .on_tool_result_async) are spawned as independent tokio tasks via tokio::spawn, so they run concurrently without blocking event delivery. Errors from async handlers are logged to stderr but do not stop the stream.

StreamBuilder vs StreamConsumer

StreamBuilderStreamConsumer
SyntaxClosure chainingTrait implementation
BoilerplateMinimal -- attach only the events you care aboutMust handle all variants (with _ => {} catch-all)
Async handlersBuilt-in .on_*_async variantsMust spawn tasks manually in on_event
ReusabilityInline, per-callReusable struct, composable via MultiConsumer
Access to result.run() returns SessionSession returned by run_streaming()

Use StreamBuilder for quick, inline streaming. Use StreamConsumer when you need reusable consumers or want to broadcast to multiple destinations.

See StreamConsumer for the trait-based approach.