CallbackConsumer
Stream consumer that invokes a callback function for each event.
Overview
CallbackConsumer wraps a single callback function that receives every StreamEvent. It provides a quick way to handle events inline without defining a full struct, while still working within the StreamConsumer trait system for composition with MultiConsumer.
Import path: appam::agent::consumers::CallbackConsumer
Definition
pub struct CallbackConsumer {
callback: Arc<dyn Fn(&StreamEvent) -> Result<()> + Send + Sync>,
}Constructor
CallbackConsumer::new
pub fn new<F>(callback: F) -> Self
where
F: Fn(&StreamEvent) -> Result<()> + Send + Sync + 'static;Creates a consumer that calls the provided closure for each event.
| Parameter | Type | Description |
|---|---|---|
callback | impl Fn(&StreamEvent) -> Result<()> + Send + Sync + 'static | Callback invoked for every event. Must be Fn (not FnMut) for thread safety. |
The callback must return Result<()>. Returning an error stops the stream.
Usage
Basic pattern matching
use appam::agent::consumers::CallbackConsumer;
use appam::agent::streaming::StreamEvent;
let consumer = CallbackConsumer::new(|event| {
match event {
StreamEvent::Content { content } => print!("{}", content),
StreamEvent::Error { message } => eprintln!("Error: {}", message),
StreamEvent::Done => println!(),
_ => {}
}
Ok(())
});
agent.run_streaming("Hello!", Box::new(consumer)).await?;Capturing state
Use Arc and Mutex for shared mutable state since the callback must be Fn (not FnMut):
use appam::agent::consumers::CallbackConsumer;
use appam::agent::streaming::StreamEvent;
use std::sync::{Arc, Mutex};
let collected = Arc::new(Mutex::new(String::new()));
let collected_clone = collected.clone();
let consumer = CallbackConsumer::new(move |event| {
if let StreamEvent::Content { content } = event {
let mut buf = collected_clone.lock().unwrap();
buf.push_str(content);
}
Ok(())
});
agent.run_streaming("Hello!", Box::new(consumer)).await?;
let full_response = collected.lock().unwrap().clone();
println!("Full response: {}", full_response);Error propagation
Return an error to stop the stream early:
use appam::agent::consumers::CallbackConsumer;
use appam::agent::streaming::StreamEvent;
let consumer = CallbackConsumer::new(|event| {
if let StreamEvent::Content { content } = event {
if content.contains("UNSAFE") {
return Err(anyhow::anyhow!("Content policy violation detected"));
}
}
Ok(())
});With MultiConsumer
CallbackConsumer composes with other consumers via MultiConsumer:
use appam::agent::streaming::MultiConsumer;
use appam::agent::consumers::{ConsoleConsumer, CallbackConsumer};
use appam::agent::streaming::StreamEvent;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let tool_count = Arc::new(AtomicUsize::new(0));
let tool_count_clone = tool_count.clone();
let counter = CallbackConsumer::new(move |event| {
if matches!(event, StreamEvent::ToolCallCompleted { .. }) {
tool_count_clone.fetch_add(1, Ordering::SeqCst);
}
Ok(())
});
let multi = MultiConsumer::new()
.add(Box::new(ConsoleConsumer::new()))
.add(Box::new(counter));
agent.run_streaming("Use all available tools", Box::new(multi)).await?;
println!("Tools called: {}", tool_count.load(Ordering::SeqCst));CallbackConsumer vs StreamBuilder
Both approaches use closures, but they differ in scope:
CallbackConsumer | StreamBuilder | |
|---|---|---|
| Callback shape | Single Fn(&StreamEvent) -> Result<()> | Separate closures per event type |
| Composability | Works with MultiConsumer | Standalone builder per agent call |
| Error control | Callback errors stop the stream | No error return from handlers |
| Use when | You need a composable consumer with full event access | You want per-event closures without trait boilerplate |