NodeStreamEvent envelope
NodeStreamEvent.v1 is the platform-native streaming envelope. Every event the workers emit for a job — agent execution today, image generation and voice synthesis tomorrow — flows over the same Redis Stream wrapped in this envelope. The envelope is what GET /jobs/:id/stream returns; the OpenAI translators (Chat Completions, Responses) consume it internally and transform the deep_agent payloads into the OpenAI wire formats.
This page documents the envelope and the aggregator endpoint. If you only care about agent runs and want the standard OpenAI wire format, use Chat Completions or Responses. If you want the full multi-modal stream — the agent's events alongside any image / audio / future node-type events from the same job — read on.
Envelope shape
interface NodeStreamEvent {
seq: number // monotonic per job; server-assigned
ts: number // unix ms, worker-captured
node_id: string // workflow node id, e.g. "deepAgent-4"
node_type: string // "deep_agent" today; "fal_image", "voice_synth", ... as they ship
payload_version: string // "AgentEvent.v1" for deep_agent
payload: unknown // shape per (node_type, payload_version)
}
| Field | Description |
|---|---|
seq | Monotonic per job. Server-assigned at ingestion. Use as the SSE id. |
ts | Worker-captured timestamp (Unix ms). |
node_id | Workflow node identifier. "MAIN" for ad-hoc agent runs not bound to a workflow node. |
node_type | Producer's snake_case identifier. The dispatch table in the backend's validator routes payload validation per (node_type, payload_version). |
payload_version | Spec version of the inner payload. For node_type=deep_agent, this is AgentEvent.v1. |
payload | The inner body. Shape varies per (node_type, payload_version). For (deep_agent, AgentEvent.v1), this carries the deep-agent execution event types (agent.start, text.delta, tool.call.start, subagent.start, etc.). |
The deep_agent payload (a.k.a. AgentEvent.v1) is the same internal shape that powers both OpenAI translators. It carries ts, agent, run_id, parent_run_ids, type, payload. Treat it as opaque unless you specifically want to render agent execution structure — the OpenAI translators are the recommended way to consume agent-only streams.
Aggregator endpoint
GET /jobs/:job_id/stream
Returns a text/event-stream of envelopes. Each SSE event carries one envelope verbatim:
event: snapshot
id: 0
data: {"result":{"stream":{}}}
event: append
id: 17
data: {"seq":17,"ts":1745701201234,"node_id":"deepAgent-4","node_type":"deep_agent","payload_version":"AgentEvent.v1","payload":{"ts":1745701201234,"agent":{"kind":"main","id":"MAIN","name":"LangGraph"},"run_id":"...","parent_run_ids":[],"type":"agent.start","payload":{}}}
event: append
id: 22
data: {"seq":22,"ts":1745701201240,"node_id":"deepAgent-4","node_type":"deep_agent","payload_version":"AgentEvent.v1","payload":{...}}
:keep-alive
event: complete
id: terminal
data: {"status":"completed"}
The frame conventions:
event: | When | data: payload |
|---|---|---|
snapshot | Once, immediately after the headers. | {"result":{"stream":{}}} — an empty placeholder so client-side aggregators can initialise their per-node bucket map deterministically. |
append | One per envelope produced by the workers, as they flow. | The full NodeStreamEvent envelope. The SSE id: equals the envelope's seq. |
complete | Once, when the underlying job reaches completed or failed. | {"status": "completed"} or {"status": "failed"}. |
:keep-alive | Every 15 seconds during quiet periods. | (SSE comment, no data.) |
Client aggregation pattern
The aggregator endpoint is named for its intended consumption pattern: the client maintains a per-node_id bucket dictionary and walks the stream, appending each envelope to its node's bucket. Pseudo-code:
const stream: Record<string, NodeStreamEvent[]> = {}
for await (const event of sse(/jobs/:id/stream)) {
if (event.event === "snapshot") continue
if (event.event === "complete") break
const envelope = JSON.parse(event.data)
const bucket = stream[envelope.node_id] ?? (stream[envelope.node_id] = [])
bucket.push(envelope)
}
A debug UI rendering each node's events grouped by node_id then has a deterministic structure to walk.
Resume
Resume is supported via the standard SSE Last-Event-ID request header. Set it to the last seq you successfully processed; the server replays envelopes whose seq > Last-Event-ID, then live-tails. Absent header replays from the beginning of the available Redis Stream window (capped at ~4096 entries by the trim policy).
curl -N \
-H "Authorization: Bearer $TOKEN" \
-H "Last-Event-ID: 17" \
https://api.alien.club/jobs/42/stream
Authorization
Same authorization as GET /jobs/:job_id: the job's creator and admins may stream; everyone else gets 403.
When to use this endpoint vs the OpenAI translators
| Use case | Endpoint |
|---|---|
| Standard agent integration via the OpenAI SDK | Chat Completions or Responses |
| Renderer for a chat UI | Responses — native resume, reasoning summaries |
| Full multi-modal visibility into a workflow run | GET /jobs/:id/stream (this endpoint) |
| Debug UI / internal tooling rendering every node's events | GET /jobs/:id/stream (this endpoint) |
The OpenAI surfaces are agent-specific by design: they only see deep_agent events. The aggregator endpoint sees everything.
Versioning
The envelope is NodeStreamEvent.v1. Major-version bumps signal breaking changes to the envelope itself — we will introduce a parallel route segment when one happens. The payload spec has its own version (AgentEvent.v1 for deep_agent today); bumping a payload spec is independent of the envelope version.
New node_type values arrive with their payload spec at the same time. Consumers that don't care about a new node type filter it out by node_type and ignore those envelopes.