Appam
API ReferenceConsumers

CallbackConsumer

Stream consumer that invokes a callback function for each event.

Overview

CallbackConsumer wraps a single callback function that receives every StreamEvent. It provides a quick way to handle events inline without defining a full struct, while still working within the StreamConsumer trait system for composition with MultiConsumer.

Import path: appam::agent::consumers::CallbackConsumer

Definition

pub struct CallbackConsumer {
    callback: Arc<dyn Fn(&StreamEvent) -> Result<()> + Send + Sync>,
}

Constructor

CallbackConsumer::new

pub fn new<F>(callback: F) -> Self
where
    F: Fn(&StreamEvent) -> Result<()> + Send + Sync + 'static;

Creates a consumer that calls the provided closure for each event.

ParameterTypeDescription
callbackimpl Fn(&StreamEvent) -> Result<()> + Send + Sync + 'staticCallback invoked for every event. Must be Fn (not FnMut) for thread safety.

The callback must return Result<()>. Returning an error stops the stream.

Usage

Basic pattern matching

use appam::agent::consumers::CallbackConsumer;
use appam::agent::streaming::StreamEvent;

let consumer = CallbackConsumer::new(|event| {
    match event {
        StreamEvent::Content { content } => print!("{}", content),
        StreamEvent::Error { message } => eprintln!("Error: {}", message),
        StreamEvent::Done => println!(),
        _ => {}
    }
    Ok(())
});

agent.run_streaming("Hello!", Box::new(consumer)).await?;

Capturing state

Use Arc and Mutex for shared mutable state since the callback must be Fn (not FnMut):

use appam::agent::consumers::CallbackConsumer;
use appam::agent::streaming::StreamEvent;
use std::sync::{Arc, Mutex};

let collected = Arc::new(Mutex::new(String::new()));
let collected_clone = collected.clone();

let consumer = CallbackConsumer::new(move |event| {
    if let StreamEvent::Content { content } = event {
        let mut buf = collected_clone.lock().unwrap();
        buf.push_str(content);
    }
    Ok(())
});

agent.run_streaming("Hello!", Box::new(consumer)).await?;

let full_response = collected.lock().unwrap().clone();
println!("Full response: {}", full_response);

Error propagation

Return an error to stop the stream early:

use appam::agent::consumers::CallbackConsumer;
use appam::agent::streaming::StreamEvent;

let consumer = CallbackConsumer::new(|event| {
    if let StreamEvent::Content { content } = event {
        if content.contains("UNSAFE") {
            return Err(anyhow::anyhow!("Content policy violation detected"));
        }
    }
    Ok(())
});

With MultiConsumer

CallbackConsumer composes with other consumers via MultiConsumer:

use appam::agent::streaming::MultiConsumer;
use appam::agent::consumers::{ConsoleConsumer, CallbackConsumer};
use appam::agent::streaming::StreamEvent;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

let tool_count = Arc::new(AtomicUsize::new(0));
let tool_count_clone = tool_count.clone();

let counter = CallbackConsumer::new(move |event| {
    if matches!(event, StreamEvent::ToolCallCompleted { .. }) {
        tool_count_clone.fetch_add(1, Ordering::SeqCst);
    }
    Ok(())
});

let multi = MultiConsumer::new()
    .add(Box::new(ConsoleConsumer::new()))
    .add(Box::new(counter));

agent.run_streaming("Use all available tools", Box::new(multi)).await?;
println!("Tools called: {}", tool_count.load(Ordering::SeqCst));

CallbackConsumer vs StreamBuilder

Both approaches use closures, but they differ in scope:

CallbackConsumerStreamBuilder
Callback shapeSingle Fn(&StreamEvent) -> Result<()>Separate closures per event type
ComposabilityWorks with MultiConsumerStandalone builder per agent call
Error controlCallback errors stop the streamNo error return from handlers
Use whenYou need a composable consumer with full event accessYou want per-event closures without trait boilerplate