Streaming Telemetry
Observability signals, metrics, and alerting guidance for the /stream SSE endpoints.
Overview
The streaming endpoints (/api/v0/executions/{workflows|agents|tasks}/{exec_id}/stream) now emit first-class telemetry. Each connection produces:
- OpenTelemetry metrics for active connections, connection duration, time-to-first-event, per-event counts, and error reasons.
- Structured logs on connect, disconnect, cancellations, and write failures with execution identifiers, mode (
structured/text), durations, and the final close reason. - Tracing spans (
stream.<kind>) containing child events for every SSE emission and heartbeat. Spans inherit the incoming request context and can be correlated with Temporal queries, Redis publishers, and downstream services.
All metrics are prefixed with compozy_stream_ and tagged with kind (workflow, agent, task). Agent/Task streams also attach mode (structured, text) to log payloads and span events.
Metrics Summary
| Metric | Type | Labels | Description |
|---|---|---|---|
compozy_stream_active_connections | UpDownCounter | kind | Current active SSE connections. Incremented on connect, decremented on close. |
compozy_stream_connection_duration_seconds | Histogram | kind | Wall-clock connection lifetime. Buckets: 0.5s, 1s, 2s, 5s, 10s, 30s, 60s, 120s, 300s, 600s. |
compozy_stream_time_to_first_event_seconds | Histogram | kind | Latency from connection acceptance to the first data event. Buckets: 50ms, 100ms, 250ms, 500ms, 1s, 2s, 5s, 10s. |
compozy_stream_events_total | Counter | kind, event_type | Total SSE frames emitted (status deltas, llm_chunk, complete, error, heartbeat). |
compozy_stream_errors_total | Counter | kind, reason | Terminal errors grouped by reason (e.g. status_error, chunk_error, heartbeat_error, subscribe_error). |
Logging Fields
Every connect/disconnect log includes:
exec_id,mode,last_event_idduration(when disconnecting),events(telemetry emitter),reason(completed,terminal_status,context_canceled, or<phase>_error)status(final execution status when available)
Tracing Attributes
Traces inherit request context and set:
stream.kind,stream.exec_id,stream.duration_seconds,stream.eventsstream.reason,stream.status,stream.last_event_id- Optional
stream.time_to_first_event_secondswhen data events flow
Heartbeat and data emissions are recorded as span events with sequence numbers to simplify flame-graph inspection.
Recommended PromQL Alerts
Use the following PromQL fragments as starting points for alerting rules.
1. Error-Rate Spike
Trigger when more than 5% of streams close with errors over a 5-minute window.
sum(increase(compozy_stream_errors_total{kind="workflow"}[5m]))
/
sum(increase(compozy_stream_events_total{kind="workflow",event_type="complete"}[5m])) > 0.05Apply the same ratio for kind="agent" and kind="task".
2. Connection Drops / Subscriptions Failing
Alert when subscribe_error, pubsub_error, or heartbeat_error reasons breach a threshold.
sum(increase(compozy_stream_errors_total{reason=~"(subscribe|pubsub|heartbeat)_error"}[1m])) > 33. Elevated Time-to-First-Event (TTFE)
Warn when the 95th percentile TTFE exceeds 1 second for workflows.
histogram_quantile(
0.95,
sum(rate(compozy_stream_time_to_first_event_seconds_bucket{kind="workflow"}[5m]))
by (le)
) > 14. Stalled Event Flow
Detect streams that emit heartbeats but no data events for more than 30 seconds.
sum(increase(compozy_stream_events_total{event_type="heartbeat",kind="agent"}[30s])) > 0
and
sum(increase(compozy_stream_events_total{event_type!="heartbeat",kind="agent"}[30s])) == 0Combine with mode="text" to focus on Redis-backed token streams.
Operational Checklist
- Dashboards: Visualize
active_connections,connection_duration_secondsheatmaps, and TTFE percentiles bykind. - Tracing: Correlate
stream.*spans with Temporal query spans to trace end-to-end latency from durable snapshots to SSE emission. - Logging: Search for
reason=*_errorto triage write failures (status writes, Redis chunk forwarding, heartbeat flush issues). - Capacity: Track
active_connectionsalongsidecompozy_stream_events_totalto estimate throughput per execution.
Configuration Notes
The connection observability layer relies on the standard context hydration pattern:
- Use
config.FromContext(ctx)andlogger.FromContext(ctx)inside SSE handlers. - Stream functions never accept injected loggers or configs; context propagation remains mandatory.
Histogram buckets are tuned for sub-second TTFE and multi-minute sessions. Adjust the bucket arrays in engine/infra/monitoring/streaming_metrics.go if your workloads require longer horizons.