Appam
API Reference

StreamConsumer

Trait for handling streaming events from agents.

Overview

StreamConsumer is the trait that defines how streaming events are handled. Implement it to build custom event processors -- loggers, UI renderers, metrics collectors, or anything else that reacts to agent activity in real time.

Import path: appam::agent::streaming::StreamConsumer

Definition

pub trait StreamConsumer: Send + Sync {
    fn on_event(&self, event: &StreamEvent) -> Result<()>;
}

on_event

fn on_event(&self, event: &StreamEvent) -> Result<()>;

Called for each event emitted by the agent during execution. Implementations should be fast and non-blocking; offload heavy work to background tasks or channels.

Parameters:

ParameterTypeDescription
event&StreamEventReference to the event being emitted.

Returns: Result<()> -- returning an error stops event propagation and terminates agent execution with that error.

Thread Safety

Consumers must be Send + Sync because:

  • The agent runtime may call on_event from different async tasks.
  • Multiple consumers may run concurrently when using MultiConsumer.
  • The consumer is passed as Box<dyn StreamConsumer> across thread boundaries.

Use Arc<Mutex<T>> or atomic types for interior mutability.

Implementation Example

use appam::agent::streaming::{StreamConsumer, StreamEvent};
use anyhow::Result;
use std::sync::{Arc, Mutex};

/// Collects all content chunks into a single string.
struct ContentCollector {
    buffer: Arc<Mutex<String>>,
}

impl ContentCollector {
    fn new() -> Self {
        Self {
            buffer: Arc::new(Mutex::new(String::new())),
        }
    }

    fn get_content(&self) -> String {
        self.buffer.lock().unwrap().clone()
    }
}

impl StreamConsumer for ContentCollector {
    fn on_event(&self, event: &StreamEvent) -> Result<()> {
        match event {
            StreamEvent::Content { content } => {
                let mut buf = self.buffer.lock().unwrap();
                buf.push_str(content);
            }
            StreamEvent::Error { message } => {
                eprintln!("Error: {}", message);
            }
            _ => {}
        }
        Ok(())
    }
}

Error Handling

Returning an error from on_event has two effects:

  1. Stops event propagation -- no further consumers receive the event (relevant for MultiConsumer).
  2. Terminates the agent -- the error propagates up through the agent's run loop.

For non-critical failures (e.g., a metrics endpoint being temporarily unavailable), log the error and return Ok(()) to allow streaming to continue:

impl StreamConsumer for MetricsConsumer {
    fn on_event(&self, event: &StreamEvent) -> Result<()> {
        if let Err(e) = self.try_record(event) {
            tracing::warn!("Failed to record metric: {}", e);
            // Don't propagate -- metrics are not critical
        }
        Ok(())
    }
}

MultiConsumer

MultiConsumer broadcasts events to multiple consumers in order. If any consumer returns an error, propagation stops.

use appam::agent::streaming::MultiConsumer;
use appam::agent::consumers::{ConsoleConsumer, ChannelConsumer};

let multi = MultiConsumer::new()
    .add(Box::new(ConsoleConsumer::new()))
    .add(Box::new(ChannelConsumer::new(tx)));

Definition

pub struct MultiConsumer {
    consumers: Vec<Box<dyn StreamConsumer>>,
}

impl MultiConsumer {
    pub fn new() -> Self;
    pub fn add(mut self, consumer: Box<dyn StreamConsumer>) -> Self;
}
MethodDescription
new()Create an empty multi-consumer.
add(consumer)Add a consumer to the broadcast list. Consumers are invoked in the order they are added. Returns self for chaining.

MultiConsumer itself implements StreamConsumer, so it can be passed anywhere a single consumer is expected:

agent.run_streaming("prompt", Box::new(multi)).await?;

Using Consumers with Agents

With run_streaming

Pass a single consumer (or a MultiConsumer) to run_streaming:

use appam::prelude::*;
use appam::agent::consumers::ConsoleConsumer;

let agent = Agent::quick("anthropic/claude-sonnet-4-5", "You are helpful.", vec![])?;
let session = agent.run_streaming("Hello!", Box::new(ConsoleConsumer::new())).await?;

With run_with_consumers

Pass a Vec<Box<dyn StreamConsumer>> to broadcast events to all consumers:

use appam::prelude::*;
use appam::agent::consumers::{ConsoleConsumer, TraceConsumer};
use appam::config::TraceFormat;
use std::path::Path;

let agent = Agent::quick("anthropic/claude-sonnet-4-5", "You are helpful.", vec![])?;

let trace = TraceConsumer::new(Path::new("logs"), "session-1", TraceFormat::Detailed)?;

let session = agent.run_with_consumers(
    "Hello!",
    vec![
        Box::new(ConsoleConsumer::new()),
        Box::new(trace),
    ],
).await?;

StreamConsumer vs StreamBuilder

StreamConsumerStreamBuilder
ApproachImplement a trait with pattern matchingChain closures on a builder
ReusabilityConsumers are reusable structsClosures are typically inline
CompositionCombine with MultiConsumerSingle builder per call
Async supportSync on_event (spawn tasks internally)Sync and async handler variants
Use whenBuilding reusable infrastructure, multiple destinationsQuick prototyping, simple one-off handlers

See StreamBuilder for the closure-based alternative.