Appam
Core Concepts

Streaming

Event-based streaming with pluggable consumers and the StreamBuilder API.

Overview

Appam streams all LLM responses in real time. As the model generates text, invokes tools, or completes its turn, the runtime emits structured StreamEvent values. You decide how to handle these events -- print to console, forward to a web client, log to a database, or any combination.

StreamEvent

Every observable action during agent execution is represented as a StreamEvent:

pub enum StreamEvent {
    SessionStarted { session_id: String },
    Content { content: String },
    Reasoning { content: String },
    ToolCallStarted { tool_name: String, arguments: String },
    ToolCallCompleted { tool_name: String, result: Value, success: bool, duration_ms: f64 },
    ToolCallFailed { tool_name: String, error: String },
    TurnCompleted,
    UsageUpdate { snapshot: AggregatedUsage },
    Done,
    Error { message: String },
}

Event Lifecycle

A typical agent interaction produces events in this order:

  1. SessionStarted -- New session created with a unique ID
  2. Content (repeated) -- Text chunks as the model generates output
  3. Reasoning (optional, repeated) -- Thinking tokens from models with extended reasoning
  4. ToolCallStarted -- Model decided to invoke a tool
  5. ToolCallCompleted or ToolCallFailed -- Tool execution result
  6. TurnCompleted -- One LLM turn finished (may loop back to step 2 with tool results)
  7. UsageUpdate -- Cumulative token counts and cost estimate
  8. Done -- Agent loop completed, no more events

If an unrecoverable error occurs at any point, an Error event is emitted instead.

The StreamBuilder API

The most ergonomic way to handle streaming events is the closure-based StreamBuilder, accessed via agent.stream():

use appam::prelude::*;

let agent = Agent::quick(
    "anthropic/claude-sonnet-4-5",
    "You are a helpful assistant.",
    vec![],
)?;

let session = agent
    .stream("Explain how async works in Rust")
    .on_content(|text| {
        print!("{}", text);
        std::io::Write::flush(&mut std::io::stdout()).ok();
    })
    .on_reasoning(|thinking| {
        eprint!("\x1b[36m{}\x1b[0m", thinking);
    })
    .on_tool_call(|name, args| {
        println!("\n[calling {}]", name);
    })
    .on_tool_result(|name, result| {
        println!("[{} completed]", name);
    })
    .on_tool_failed(|name, error| {
        eprintln!("[{} failed: {}]", name, error);
    })
    .on_error(|err| {
        eprintln!("Error: {}", err);
    })
    .on_done(|| {
        println!("\nDone.");
    })
    .on_session_started(|id| {
        println!("Session: {}", id);
    })
    .run()
    .await?;

All handlers are optional -- register only the ones you need. The builder returns a Session on completion.

Available Handlers

HandlerSignatureCalled When
on_contentFn(&str)Each text chunk from the LLM
on_reasoningFn(&str)Each thinking/reasoning chunk
on_tool_callFn(&str, &str)Tool invocation starts (name, args)
on_tool_resultFn(&str, &Value)Tool completes successfully (name, result)
on_tool_failedFn(&str, &str)Tool execution fails (name, error)
on_errorFn(&str)Unrecoverable error occurs
on_doneFn()Agent loop finishes
on_session_startedFn(&str)Session created (session_id)

Async Handlers

For handlers that need to perform async operations (database writes, API calls), use the async variants:

agent
    .stream("Do something")
    .on_tool_call_async(|name, args| async move {
        // Async database logging
        db.log_tool_call(&name, &args).await?;
        Ok(())
    })
    .on_tool_result_async(|name, result| async move {
        // Async result processing
        db.save_result(&name, &result).await?;
        Ok(())
    })
    .run()
    .await?;

Async handlers are spawned as Tokio tasks and run concurrently with the main stream processing.

StreamConsumer Trait

For reusable event handling logic, implement the StreamConsumer trait:

use appam::agent::streaming::{StreamConsumer, StreamEvent};
use anyhow::Result;

struct MetricsConsumer {
    tool_calls: std::sync::atomic::AtomicU64,
}

impl StreamConsumer for MetricsConsumer {
    fn on_event(&self, event: &StreamEvent) -> Result<()> {
        match event {
            StreamEvent::ToolCallCompleted { tool_name, duration_ms, .. } => {
                self.tool_calls.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                println!("[metrics] {} took {:.1}ms", tool_name, duration_ms);
            }
            _ => {}
        }
        Ok(())
    }
}

Consumers must be Send + Sync. Returning an error from on_event stops the agent execution.

Use consumers with run_streaming():

let consumer = MetricsConsumer { tool_calls: AtomicU64::new(0) };
let session = agent
    .run_streaming("Hello!", Box::new(consumer))
    .await?;

Built-in Consumers

Appam ships with several ready-to-use consumers:

ConsoleConsumer

Pretty-prints events to the terminal with color and formatting:

use appam::agent::consumers::ConsoleConsumer;

let consumer = ConsoleConsumer::new()
    .with_colors(true)
    .with_reasoning(true);

This is the default consumer used by agent.run().

ChannelConsumer

Forwards events to a Tokio mpsc channel for decoupled processing:

use appam::agent::consumers::ChannelConsumer;

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let consumer = ChannelConsumer::new(tx);

// Process events in another task
tokio::spawn(async move {
    while let Some(event) = rx.recv().await {
        // Handle event
    }
});

agent.run_streaming("Hello!", Box::new(consumer)).await?;

CallbackConsumer

Wraps a closure as a consumer:

use appam::agent::consumers::CallbackConsumer;

let consumer = CallbackConsumer::new(|event| {
    println!("{:?}", event);
    Ok(())
});

TraceConsumer

Writes events to session trace files for debugging and replay:

use appam::agent::consumers::TraceConsumer;
use appam::config::TraceFormat;
use std::path::Path;

let consumer = TraceConsumer::new(
    Path::new("logs"),
    "session-123",
    TraceFormat::Detailed,
)?;

SqliteTraceConsumer

Persists events to a SQLite database for structured querying. This constructor is lower-level than the other built-in consumers because it expects an existing SqlitePool plus session metadata:

use appam::agent::consumers::SqliteTraceConsumer;
use sqlx::SqlitePool;
use std::sync::Arc;

let pool = Arc::new(SqlitePool::connect("sqlite:traces.db").await?);
let consumer = SqliteTraceConsumer::new(
    pool,
    "session-123".to_string(),
    "my-agent".to_string(),
    "claude-sonnet-4-5".to_string(),
    "chat".to_string(),
    1,
);

MultiConsumer

Broadcast events to multiple consumers simultaneously using MultiConsumer:

use appam::agent::streaming::MultiConsumer;
use appam::agent::consumers::*;

let multi = MultiConsumer::new()
    .add(Box::new(ConsoleConsumer::new()))
    .add(Box::new(ChannelConsumer::new(tx)))
    .add(Box::new(TraceConsumer::new(
        std::path::Path::new("logs"),
        "session-123",
        appam::config::TraceFormat::Detailed,
    )?));

agent.run_streaming("Hello!", Box::new(multi)).await?;

Or use run_with_consumers() for a more concise API:

agent.run_with_consumers("Hello!", vec![
    Box::new(ConsoleConsumer::new()),
    Box::new(ChannelConsumer::new(tx)),
]).await?;

Events are delivered to consumers in order. If any consumer returns an error, propagation stops and the error is returned to the caller.