Streaming Telemetry

Observability signals, metrics, and alerting guidance for the /stream SSE endpoints.

Overview

The streaming endpoints (/api/v0/executions/{workflows|agents|tasks}/{exec_id}/stream) now emit first-class telemetry. Each connection produces:

  • OpenTelemetry metrics for active connections, connection duration, time-to-first-event, per-event counts, and error reasons.
  • Structured logs on connect, disconnect, cancellations, and write failures with execution identifiers, mode (structured/text), durations, and the final close reason.
  • Tracing spans (stream.<kind>) containing child events for every SSE emission and heartbeat. Spans inherit the incoming request context and can be correlated with Temporal queries, Redis publishers, and downstream services.

All metrics are prefixed with compozy_stream_ and tagged with kind (workflow, agent, task). Agent/Task streams also attach mode (structured, text) to log payloads and span events.

Metrics Summary

MetricTypeLabelsDescription
compozy_stream_active_connectionsUpDownCounterkindCurrent active SSE connections. Incremented on connect, decremented on close.
compozy_stream_connection_duration_secondsHistogramkindWall-clock connection lifetime. Buckets: 0.5s, 1s, 2s, 5s, 10s, 30s, 60s, 120s, 300s, 600s.
compozy_stream_time_to_first_event_secondsHistogramkindLatency from connection acceptance to the first data event. Buckets: 50ms, 100ms, 250ms, 500ms, 1s, 2s, 5s, 10s.
compozy_stream_events_totalCounterkind, event_typeTotal SSE frames emitted (status deltas, llm_chunk, complete, error, heartbeat).
compozy_stream_errors_totalCounterkind, reasonTerminal errors grouped by reason (e.g. status_error, chunk_error, heartbeat_error, subscribe_error).

Logging Fields

Every connect/disconnect log includes:

  • exec_id, mode, last_event_id
  • duration (when disconnecting), events (telemetry emitter), reason (completed, terminal_status, context_canceled, or <phase>_error)
  • status (final execution status when available)

Tracing Attributes

Traces inherit request context and set:

  • stream.kind, stream.exec_id, stream.duration_seconds, stream.events
  • stream.reason, stream.status, stream.last_event_id
  • Optional stream.time_to_first_event_seconds when data events flow

Heartbeat and data emissions are recorded as span events with sequence numbers to simplify flame-graph inspection.

Use the following PromQL fragments as starting points for alerting rules.

1. Error-Rate Spike

Trigger when more than 5% of streams close with errors over a 5-minute window.

sum(increase(compozy_stream_errors_total{kind="workflow"}[5m]))
  /
sum(increase(compozy_stream_events_total{kind="workflow",event_type="complete"}[5m])) > 0.05

Apply the same ratio for kind="agent" and kind="task".

2. Connection Drops / Subscriptions Failing

Alert when subscribe_error, pubsub_error, or heartbeat_error reasons breach a threshold.

sum(increase(compozy_stream_errors_total{reason=~"(subscribe|pubsub|heartbeat)_error"}[1m])) > 3

3. Elevated Time-to-First-Event (TTFE)

Warn when the 95th percentile TTFE exceeds 1 second for workflows.

histogram_quantile(
  0.95,
  sum(rate(compozy_stream_time_to_first_event_seconds_bucket{kind="workflow"}[5m]))
  by (le)
) > 1

4. Stalled Event Flow

Detect streams that emit heartbeats but no data events for more than 30 seconds.

sum(increase(compozy_stream_events_total{event_type="heartbeat",kind="agent"}[30s])) > 0
and
sum(increase(compozy_stream_events_total{event_type!="heartbeat",kind="agent"}[30s])) == 0

Combine with mode="text" to focus on Redis-backed token streams.

Operational Checklist

  • Dashboards: Visualize active_connections, connection_duration_seconds heatmaps, and TTFE percentiles by kind.
  • Tracing: Correlate stream.* spans with Temporal query spans to trace end-to-end latency from durable snapshots to SSE emission.
  • Logging: Search for reason=*_error to triage write failures (status writes, Redis chunk forwarding, heartbeat flush issues).
  • Capacity: Track active_connections alongside compozy_stream_events_total to estimate throughput per execution.

Configuration Notes

The connection observability layer relies on the standard context hydration pattern:

  • Use config.FromContext(ctx) and logger.FromContext(ctx) inside SSE handlers.
  • Stream functions never accept injected loggers or configs; context propagation remains mandatory.

Histogram buckets are tuned for sub-second TTFE and multi-minute sessions. Adjust the bucket arrays in engine/infra/monitoring/streaming_metrics.go if your workloads require longer horizons.