Server-Sent Events
Real-time server-to-client communication with SSE (Server-Sent Events). Channel-based broadcasting, client auto-reconnection, pattern authorization, and full TypeScript support.
SSE provides a lightweight alternative to WebSockets for one-way communication from server to client. Perfect for notifications, live updates, progress tracking, and real-time data streaming.
Quick Start
Server-side
import { sse } from '@strav/signal'
import { session } from '@strav/http'
// Bootstrap SSE endpoint
sse.boot(router, {
middleware: [session()],
path: '/_sse'
})
// Define channels
sse.channel('notifications')
sse.channel('users/:id', async (ctx, { id }) => {
return ctx.get('user')?.id === id
})
// Broadcast events
sse.to('notifications').send('alert', {
level: 'info',
message: 'System maintenance in 5 minutes'
})
sse.to(`users/${userId}`).send('message', {
text: 'You have a new message',
from: 'admin'
})
Client-side
import { SSEClient } from '@strav/signal/sse'
const client = new SSEClient()
// Subscribe to channels
const notifications = client.subscribe('notifications')
notifications.on('alert', (data) => {
console.log(`Alert: ${data.message}`)
})
const userChannel = client.subscribe(`users/${userId}`)
userChannel.on('message', (data) => {
console.log(`New message: ${data.text}`)
})
// Handle connection events
client.on('connected', () => console.log('SSE connected'))
client.on('error', (err) => console.error('SSE error:', err))
Installation
SSE support is built into @strav/signal:
bun add @strav/signal
Server Setup
Using a service provider (recommended)
The recommended way is to configure SSE in your application bootstrap:
import { app, router } from '@strav/http'
import { session } from '@strav/http'
import { sse } from '@strav/signal'
// Bootstrap SSE with middleware
sse.boot(router, {
middleware: [session()],
defaultHeartbeat: 30000,
cors: ['https://yourdomain.com']
})
Configuration options
| Option | Default | Description |
|---|---|---|
path |
'/_sse' |
SSE endpoint path |
middleware |
[] |
Middleware to run on SSE connections |
defaultHeartbeat |
30000 |
Default heartbeat interval in ms |
cors |
'*' |
CORS origins (string or array) |
Channels
Define channels to control which topics clients can subscribe to. Channel patterns support parameters like route patterns.
Public channel
Anyone can subscribe — no authorization required:
sse.channel('announcements')
sse.channel('public/news')
Authorized channel with parameters
Use authorization callbacks to control access:
// User-specific channel
sse.channel('users/:id', async (ctx, { id }) => {
const user = ctx.get('user')
return user?.id === id
})
// Team-specific channel
sse.channel('teams/:teamId/updates', async (ctx, { teamId }) => {
const user = ctx.get('user')
return user?.teams.includes(teamId)
})
// Role-based channel
sse.channel('admin/logs', async (ctx) => {
const user = ctx.get('user')
return user?.role === 'admin'
})
Channel with custom configuration
sse.channel('live-metrics', {
authorize: async (ctx) => {
return ctx.get('user')?.subscription === 'premium'
},
heartbeat: 5000 // More frequent heartbeat for live data
})
Broadcasting Events
Basic broadcasting
// Send typed event with data
sse.to('notifications').send('alert', {
level: 'warning',
message: 'Server will restart in 2 minutes',
timestamp: Date.now()
})
// Send data without event type
sse.to('metrics').data({
cpu: 0.75,
memory: 0.82,
disk: 0.45
})
Excluding specific clients
// Broadcast to all except the sender
sse.to('chat/room1')
.except(senderId)
.send('message', {
user: 'Alice',
text: 'Hello everyone!'
})
// Exclude multiple clients
sse.to('game/lobby')
.except(player1Id, player2Id)
.send('player_joined', { name: 'Bob' })
Channel statistics
// Get connection count
console.log(`Active connections: ${sse.connectionCount}`)
// Get subscribers for a channel
console.log(`Notification subscribers: ${sse.subscriberCount('notifications')}`)
// List all active channels
console.log('Active channels:', sse.activeChannels)
Client-Side Usage
Creating an SSE client
import { SSEClient } from '@strav/signal/sse'
const client = new SSEClient({
url: '/_sse', // SSE endpoint URL
reconnectDelay: 1000, // Initial reconnect delay
maxReconnectDelay: 30000, // Max reconnect delay
maxReconnectAttempts: 10, // Max reconnect attempts
reconnectMultiplier: 1.5 // Exponential backoff multiplier
})
Channel subscriptions
// Subscribe to a channel
const subscription = client.subscribe('notifications')
// Listen for specific events
subscription.on('alert', (data) => {
showAlert(data.level, data.message)
})
subscription.on('update', (data) => {
updateUI(data)
})
// Handle subscription errors
subscription.on('error', (error) => {
console.error('Channel error:', error)
})
// Unsubscribe when done
subscription.close()
Client connection events
// Connection lifecycle
client.on('connected', () => {
console.log('SSE connected')
showConnectionStatus('Connected')
})
client.on('disconnected', () => {
console.log('SSE disconnected')
showConnectionStatus('Disconnected')
})
client.on('reconnecting', ({ attempt, delay }) => {
console.log(`Reconnecting... attempt ${attempt}, delay ${delay}ms`)
showConnectionStatus(`Reconnecting (${attempt})`)
})
client.on('error', (error) => {
console.error('Connection error:', error)
})
// Check connection state
if (client.connected) {
console.log('Client is connected')
}
Programmatic connection management
// Manual connection control
client.disconnect()
client.connect()
client.reconnect()
// Close client and all subscriptions
client.close()
Stream Utilities
The SSE module provides utilities for creating custom SSE streams.
Progress streams
import { createProgressStream } from '@strav/signal/sse'
// Create progress stream for long operations
export async function processData(ctx: Context) {
const stream = createProgressStream(async (update) => {
for (let i = 0; i <= 100; i += 10) {
update(i, `Processing... ${i}%`)
await performWork() // Your work here
}
})
return ctx.sse(stream)
}
Custom SSE streams
import { createSSEStream, formatSSE } from '@strav/signal/sse'
// Create stream from async generator
export async function liveMetrics(ctx: Context) {
const stream = createSSEStream(async function* () {
while (true) {
yield {
event: 'metrics',
data: await getSystemMetrics()
}
await new Promise(resolve => setTimeout(resolve, 1000))
}
})
return ctx.sse(stream)
}
// Manual stream creation
export async function customStream(ctx: Context) {
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder()
// Send initial event
const welcome = formatSSE({
event: 'welcome',
data: { timestamp: Date.now() }
})
controller.enqueue(encoder.encode(welcome))
// Set up interval for updates
const interval = setInterval(() => {
const update = formatSSE({
event: 'update',
data: { value: Math.random() }
})
controller.enqueue(encoder.encode(update))
}, 1000)
// Cleanup on stream close
return () => clearInterval(interval)
}
})
return ctx.sse(stream)
}
Error Handling
Server-side error handling
// Authorization errors are handled automatically
sse.channel('admin-only', async (ctx) => {
const user = ctx.get('user')
if (!user?.isAdmin) {
throw new Error('Insufficient permissions')
}
return true
})
// Broadcasting errors
try {
sse.to('invalid-channel').send('test', {})
} catch (error) {
console.error('Broadcast failed:', error)
}
Client-side error handling
// Handle client errors
client.on('error', (error) => {
console.error('SSE error:', error)
// Show user-friendly error message
showNotification('Connection error', 'error')
})
// Handle subscription errors
subscription.on('error', (error) => {
if (error.message.includes('Unauthorized')) {
redirectToLogin()
} else {
console.error('Subscription error:', error)
}
})
// Handle connection state changes
client.on('disconnected', () => {
// Show offline indicator
showOfflineIndicator(true)
})
client.on('connected', () => {
// Hide offline indicator
showOfflineIndicator(false)
})
TypeScript Support
Full TypeScript support with typed events and data:
Server-side types
import type {
SSEEvent,
SSEChannelConfig,
SSEAuthorizeCallback
} from '@strav/signal/sse'
// Define event types
interface AlertEvent {
level: 'info' | 'warning' | 'error'
message: string
timestamp: number
}
interface MetricsEvent {
cpu: number
memory: number
disk: number
}
// Typed broadcasting
sse.to('alerts').send('alert', {
level: 'warning',
message: 'High CPU usage',
timestamp: Date.now()
} as AlertEvent)
// Typed authorization
const adminAuth: SSEAuthorizeCallback = async (ctx, params) => {
const user = ctx.get('user')
return user?.role === 'admin'
}
sse.channel('admin/logs', adminAuth)
Client-side types
import type {
SSEClient,
SSESubscription,
SSEEventListener
} from '@strav/signal/sse'
// Typed event listeners
const alertListener: SSEEventListener<AlertEvent> = (data) => {
console.log(`${data.level}: ${data.message}`)
}
const subscription = client.subscribe('alerts')
subscription.on('alert', alertListener)
// Typed subscription
interface TypedSubscription extends SSESubscription {
onAlert(listener: SSEEventListener<AlertEvent>): () => void
onMetrics(listener: SSEEventListener<MetricsEvent>): () => void
}
Real-World Examples
Live chat system
// Server: Chat channels
sse.channel('chat/:roomId', async (ctx, { roomId }) => {
const user = ctx.get('user')
const room = await Room.find(roomId)
return room?.hasUser(user.id) ?? false
})
// Server: New message handler
async function sendChatMessage(roomId: string, message: ChatMessage) {
sse.to(`chat/${roomId}`)
.except(message.senderId)
.send('message', {
id: message.id,
text: message.text,
sender: message.sender.name,
timestamp: message.createdAt
})
}
// Client: Chat UI
const chat = client.subscribe(`chat/${roomId}`)
chat.on('message', (message) => {
appendMessage(message)
playNotificationSound()
})
chat.on('typing', ({ user }) => {
showTypingIndicator(user)
})
Real-time dashboard
// Server: Metrics broadcasting
setInterval(() => {
const metrics = collectSystemMetrics()
sse.to('dashboard/metrics').data({
timestamp: Date.now(),
cpu: metrics.cpu,
memory: metrics.memory,
activeUsers: metrics.activeUsers,
requestsPerSecond: metrics.rps
})
}, 5000)
// Client: Dashboard updates
const dashboard = client.subscribe('dashboard/metrics')
dashboard.on('message', (metrics) => {
updateChart('cpu', metrics.cpu)
updateChart('memory', metrics.memory)
updateCounter('users', metrics.activeUsers)
updateCounter('rps', metrics.requestsPerSecond)
})
File upload progress
// Server: Upload progress endpoint
export async function uploadWithProgress(ctx: Context) {
const uploadId = ctx.params.uploadId
const stream = createProgressStream(async (update) => {
const file = ctx.request.file
let uploaded = 0
const total = file.size
// Simulate chunked upload
while (uploaded < total) {
const chunk = await file.read(1024)
uploaded += chunk.length
const percent = Math.round((uploaded / total) * 100)
update(percent, `Uploaded ${uploaded}/${total} bytes`)
await uploadChunk(uploadId, chunk)
}
})
return ctx.sse(stream)
}
// Client: Progress tracking
const progress = client.subscribe(`upload/${uploadId}`)
progress.on('progress', ({ percent, message }) => {
updateProgressBar(percent)
updateStatusText(message)
})
progress.on('complete', () => {
showSuccessMessage('Upload complete!')
redirectToFileList()
})
progress.on('error', ({ message }) => {
showErrorMessage(`Upload failed: ${message}`)
})
Best Practices
Channel naming
- Use hierarchical names:
notifications/users/123 - Include resource type:
projects/456/updates - Use parameters for dynamic channels:
teams/:id/chat - Separate public and private:
public/announcements,users/:id/private
Performance considerations
- Limit channel subscriptions per client (typically < 10)
- Use appropriate heartbeat intervals (30s for most cases)
- Clean up subscriptions when components unmount
- Consider message size and frequency for mobile clients
Error recovery
- Implement client-side connection state UI
- Handle authorization errors gracefully
- Provide manual reconnection options
- Log connection issues for debugging
Security
- Always implement channel authorization for sensitive data
- Validate user permissions in authorization callbacks
- Use middleware for session/authentication checks
- Sanitize data before broadcasting