Shannon provides real-time event streaming through Server-Sent Events (SSE) and WebSocket protocols. Use streaming to monitor task execution, display progress, and receive results as they’re generated.
Authentication: Streaming endpoints require the same headers as other APIs.
Browsers cannot send custom headers with EventSource.
Development: set GATEWAY_SKIP_AUTH=1.
Production: proxy SSE via your backend and inject X-API-Key or Bearer headers.
For SSE endpoints, the api_key query parameter is supported as a fallback (e.g., ?api_key=sk_...). For all other endpoints, use headers exclusively.
Streaming Limits:
Timeout: Streams automatically close after 5 minutes of inactivity
Buffer Size: Maximum 1MB buffered data per connection
Usage Metadata: Token counts and costs now available for all LLM providers (OpenAI, Anthropic, Google, Groq, xAI)
import httpximport jsondef submit_and_stream(query: str, api_key: str): """Submit task and stream events.""" # 1. Submit task response = httpx.post( "http://localhost:8080/api/v1/tasks/stream", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "query": query, "session_id": f"session-{int(time.time())}" } ) data = response.json() workflow_id = data["workflow_id"] stream_url = data["stream_url"] print(f"Task submitted: {workflow_id}") # 2. Stream events with httpx.stream( "GET", f"http://localhost:8080{stream_url}", headers={"Authorization": f"Bearer {api_key}"}, timeout=None ) as stream_response: for line in stream_response.iter_lines(): if line.startswith("data:"): event = json.loads(line[5:]) print(f"[{event['type']}] {event.get('message', '')}") if event['type'] in ['WORKFLOW_COMPLETED']: return event# Usageresult = submit_and_stream("What is the capital of France?", "sk_test_123456")print("Final result:", result.get('result'))
Why use this endpoint? The unified endpoint ensures you start streaming immediately after submission, preventing any missed events that could occur if you submit and then connect separately.
Resume from specific event ID. Accepts a Redis stream ID (e.g., 1700000000000-0) or a numeric sequence (e.g., 42). When numeric, replay includes events with seq > last_event_id.
Event ID format: The id field uses Redis stream IDs (e.g., 1719000000000-0), NOT simple integers. You must use these exact IDs when reconnecting with last_event_id. See Reconnection below.
The gateway authenticates WebSocket connections via headers only (X-API-Key or Authorization). Browsers cannot set custom headers during the WebSocket handshake. For browser usage:
Run locally with GATEWAY_SKIP_AUTH=1, or
Use a reverse proxy that injects the header before forwarding to the gateway.
Header-based examples for server environments:Node (ws):
Shannon also provides an OpenAI-compatible streaming endpoint at /v1/chat/completions that translates Shannon events into the standard OpenAI chat.completion.chunk format. This allows you to use OpenAI SDKs directly with Shannon.For complete documentation on the OpenAI-compatible API, including request/response schemas, available models, Shannon-specific extensions (shannon_events), and SDK usage examples, see the OpenAI-Compatible API Reference.
For most integrations, listen to thread.message.delta (streaming text) and thread.message.completed (final result with usage metadata) rather than LLM_PARTIAL/LLM_OUTPUT.
Explicit end-of-stream signal (no more events will be emitted for this workflow)
Over SSE, the STREAM_END lifecycle event is delivered as an SSE event named done with data: [DONE] (plain text, not JSON). Over WebSocket, it appears as a normal JSON event with "type": "STREAM_END".
def stream_filtered_events(task_id: str, api_key: str, event_types: list): """Stream only specific event types.""" types_param = ",".join(event_types) url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}&types={types_param}" with httpx.stream("GET", url, headers={"X-API-Key": api_key}) as response: for line in response.iter_lines(): if line.startswith("data:"): yield json.loads(line[5:])# Only receive agent thinking and tool eventsfor event in stream_filtered_events( "task_abc123", "sk_test_123456", ["AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"]): print(f"{event['type']}: {event.get('message', event.get('tool'))}")
Deep Research tasks take 2-10 minutes. Use context.force_research to trigger Deep Research through the Task API:
curl -X POST http://localhost:8080/api/v1/tasks/stream \ -H "Content-Type: application/json" \ -H "X-API-Key: sk_test_123456" \ -d '{ "query": "What are the latest developments in quantum computing?", "session_id": "session-123", "context": {"force_research": true} }'
Control research depth with context.research_strategy:
Strategy
Depth
Typical Duration
Use Case
quick
Low
30s-2min
Simple factual queries
standard (default)
Medium
2-5min
General research
deep
High
5-10min
Complex multi-faceted topics
academic
Highest
5-10min
Citation-heavy academic research
Chat API vs Task API for Deep Research: The Chat API (/v1/chat/completions with model: "shannon-deep-research") can also trigger Deep Research, but its streaming format does not include SSE event IDs — so reconnection is impossible. If your platform has connection time limits (e.g., Vercel 5-min limit) or you need to survive page refreshes, use the Task API.
SSE connections may drop due to network issues, proxy timeouts, or platform limits (e.g., Vercel hobby plan: 5-minute connection limit). Shannon supports resuming from where you left off.How it works:
Track the id field of each received SSE event
When disconnected, reconnect with last_event_id parameter
The server replays all events after that ID (buffered ~256 events, 24h TTL)
# Reconnect from where you left offcurl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123&last_event_id=1719000002000-0" \ -H "X-API-Key: sk_test_123456"
For platforms with connection time limits, proactively disconnect before the limit:
1. Connect to SSE2. Start a 4-minute timer (safety margin before 5-min limit)3. When timer fires: a. Record last received event ID b. Close connection c. Immediately reconnect with last_event_id=<last_id>4. Repeat until you receive WORKFLOW_COMPLETED or done event
import timedef stream_with_reconnect(task_id: str, api_key: str, max_retries: int = 5): """Stream with automatic reconnection using last_event_id.""" last_id = None for attempt in range(max_retries): try: url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}" if last_id: url += f"&last_event_id={last_id}" with httpx.stream("GET", url, headers={"X-API-Key": api_key}, timeout=None) as response: for line in response.iter_lines(): if line.startswith("id:"): last_id = line[3:].strip() # Track last event ID elif line.startswith("data:"): event = json.loads(line[5:]) yield event if event.get('type') in ('WORKFLOW_COMPLETED', 'STREAM_END'): return except Exception as e: if attempt < max_retries - 1: wait = min(2 ** attempt, 10) print(f"Connection lost, reconnecting in {wait}s (last_id={last_id})...") time.sleep(wait) else: raise
def stream_with_resume(task_id: str, api_key: str, last_event_id: str = None): """Resume streaming from specific event.""" url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}" if last_event_id: url += f"&last_event_id={last_event_id}" # Stream events...
Note: last_event_id accepts either a Redis stream ID (e.g., 1700000000000-0) or a numeric sequence (e.g., 42). When numeric, replay includes events with seq > last_event_id.