Server-Sent Events

Real-time server-to-client communication with SSE (Server-Sent Events). Channel-based broadcasting, client auto-reconnection, pattern authorization, and full TypeScript support.

SSE provides a lightweight alternative to WebSockets for one-way communication from server to client. Perfect for notifications, live updates, progress tracking, and real-time data streaming.

Quick Start

Server-side

import { sse } from '@strav/signal'
import { session } from '@strav/http'

// Bootstrap SSE endpoint
sse.boot(router, {
  middleware: [session()],
  path: '/_sse'
})

// Define channels
sse.channel('notifications')
sse.channel('users/:id', async (ctx, { id }) => {
  return ctx.get('user')?.id === id
})

// Broadcast events
sse.to('notifications').send('alert', {
  level: 'info',
  message: 'System maintenance in 5 minutes'
})

sse.to(`users/${userId}`).send('message', {
  text: 'You have a new message',
  from: 'admin'
})

Client-side

import { SSEClient } from '@strav/signal/sse'

const client = new SSEClient()

// Subscribe to channels
const notifications = client.subscribe('notifications')
notifications.on('alert', (data) => {
  console.log(`Alert: ${data.message}`)
})

const userChannel = client.subscribe(`users/${userId}`)
userChannel.on('message', (data) => {
  console.log(`New message: ${data.text}`)
})

// Handle connection events
client.on('connected', () => console.log('SSE connected'))
client.on('error', (err) => console.error('SSE error:', err))

Installation

SSE support is built into @strav/signal:

bun add @strav/signal

Server Setup

Using a service provider (recommended)

The recommended way is to configure SSE in your application bootstrap:

import { app, router } from '@strav/http'
import { session } from '@strav/http'
import { sse } from '@strav/signal'

// Bootstrap SSE with middleware
sse.boot(router, {
  middleware: [session()],
  defaultHeartbeat: 30000,
  cors: ['https://yourdomain.com']
})

Configuration options

Option Default Description
path '/_sse' SSE endpoint path
middleware [] Middleware to run on SSE connections
defaultHeartbeat 30000 Default heartbeat interval in ms
cors '*' CORS origins (string or array)

Channels

Define channels to control which topics clients can subscribe to. Channel patterns support parameters like route patterns.

Public channel

Anyone can subscribe — no authorization required:

sse.channel('announcements')
sse.channel('public/news')

Authorized channel with parameters

Use authorization callbacks to control access:

// User-specific channel
sse.channel('users/:id', async (ctx, { id }) => {
  const user = ctx.get('user')
  return user?.id === id
})

// Team-specific channel
sse.channel('teams/:teamId/updates', async (ctx, { teamId }) => {
  const user = ctx.get('user')
  return user?.teams.includes(teamId)
})

// Role-based channel
sse.channel('admin/logs', async (ctx) => {
  const user = ctx.get('user')
  return user?.role === 'admin'
})

Channel with custom configuration

sse.channel('live-metrics', {
  authorize: async (ctx) => {
    return ctx.get('user')?.subscription === 'premium'
  },
  heartbeat: 5000  // More frequent heartbeat for live data
})

Broadcasting Events

Basic broadcasting

// Send typed event with data
sse.to('notifications').send('alert', {
  level: 'warning',
  message: 'Server will restart in 2 minutes',
  timestamp: Date.now()
})

// Send data without event type
sse.to('metrics').data({
  cpu: 0.75,
  memory: 0.82,
  disk: 0.45
})

Excluding specific clients

// Broadcast to all except the sender
sse.to('chat/room1')
   .except(senderId)
   .send('message', {
     user: 'Alice',
     text: 'Hello everyone!'
   })

// Exclude multiple clients
sse.to('game/lobby')
   .except(player1Id, player2Id)
   .send('player_joined', { name: 'Bob' })

Channel statistics

// Get connection count
console.log(`Active connections: ${sse.connectionCount}`)

// Get subscribers for a channel
console.log(`Notification subscribers: ${sse.subscriberCount('notifications')}`)

// List all active channels
console.log('Active channels:', sse.activeChannels)

Client-Side Usage

Creating an SSE client

import { SSEClient } from '@strav/signal/sse'

const client = new SSEClient({
  url: '/_sse',                    // SSE endpoint URL
  reconnectDelay: 1000,            // Initial reconnect delay
  maxReconnectDelay: 30000,        // Max reconnect delay
  maxReconnectAttempts: 10,        // Max reconnect attempts
  reconnectMultiplier: 1.5         // Exponential backoff multiplier
})

Channel subscriptions

// Subscribe to a channel
const subscription = client.subscribe('notifications')

// Listen for specific events
subscription.on('alert', (data) => {
  showAlert(data.level, data.message)
})

subscription.on('update', (data) => {
  updateUI(data)
})

// Handle subscription errors
subscription.on('error', (error) => {
  console.error('Channel error:', error)
})

// Unsubscribe when done
subscription.close()

Client connection events

// Connection lifecycle
client.on('connected', () => {
  console.log('SSE connected')
  showConnectionStatus('Connected')
})

client.on('disconnected', () => {
  console.log('SSE disconnected')
  showConnectionStatus('Disconnected')
})

client.on('reconnecting', ({ attempt, delay }) => {
  console.log(`Reconnecting... attempt ${attempt}, delay ${delay}ms`)
  showConnectionStatus(`Reconnecting (${attempt})`)
})

client.on('error', (error) => {
  console.error('Connection error:', error)
})

// Check connection state
if (client.connected) {
  console.log('Client is connected')
}

Programmatic connection management

// Manual connection control
client.disconnect()
client.connect()
client.reconnect()

// Close client and all subscriptions
client.close()

Stream Utilities

The SSE module provides utilities for creating custom SSE streams.

Progress streams

import { createProgressStream } from '@strav/signal/sse'

// Create progress stream for long operations
export async function processData(ctx: Context) {
  const stream = createProgressStream(async (update) => {
    for (let i = 0; i <= 100; i += 10) {
      update(i, `Processing... ${i}%`)
      await performWork() // Your work here
    }
  })

  return ctx.sse(stream)
}

Custom SSE streams

import { createSSEStream, formatSSE } from '@strav/signal/sse'

// Create stream from async generator
export async function liveMetrics(ctx: Context) {
  const stream = createSSEStream(async function* () {
    while (true) {
      yield {
        event: 'metrics',
        data: await getSystemMetrics()
      }
      await new Promise(resolve => setTimeout(resolve, 1000))
    }
  })

  return ctx.sse(stream)
}

// Manual stream creation
export async function customStream(ctx: Context) {
  const stream = new ReadableStream({
    start(controller) {
      const encoder = new TextEncoder()

      // Send initial event
      const welcome = formatSSE({
        event: 'welcome',
        data: { timestamp: Date.now() }
      })
      controller.enqueue(encoder.encode(welcome))

      // Set up interval for updates
      const interval = setInterval(() => {
        const update = formatSSE({
          event: 'update',
          data: { value: Math.random() }
        })
        controller.enqueue(encoder.encode(update))
      }, 1000)

      // Cleanup on stream close
      return () => clearInterval(interval)
    }
  })

  return ctx.sse(stream)
}

Error Handling

Server-side error handling

// Authorization errors are handled automatically
sse.channel('admin-only', async (ctx) => {
  const user = ctx.get('user')
  if (!user?.isAdmin) {
    throw new Error('Insufficient permissions')
  }
  return true
})

// Broadcasting errors
try {
  sse.to('invalid-channel').send('test', {})
} catch (error) {
  console.error('Broadcast failed:', error)
}

Client-side error handling

// Handle client errors
client.on('error', (error) => {
  console.error('SSE error:', error)

  // Show user-friendly error message
  showNotification('Connection error', 'error')
})

// Handle subscription errors
subscription.on('error', (error) => {
  if (error.message.includes('Unauthorized')) {
    redirectToLogin()
  } else {
    console.error('Subscription error:', error)
  }
})

// Handle connection state changes
client.on('disconnected', () => {
  // Show offline indicator
  showOfflineIndicator(true)
})

client.on('connected', () => {
  // Hide offline indicator
  showOfflineIndicator(false)
})

TypeScript Support

Full TypeScript support with typed events and data:

Server-side types

import type {
  SSEEvent,
  SSEChannelConfig,
  SSEAuthorizeCallback
} from '@strav/signal/sse'

// Define event types
interface AlertEvent {
  level: 'info' | 'warning' | 'error'
  message: string
  timestamp: number
}

interface MetricsEvent {
  cpu: number
  memory: number
  disk: number
}

// Typed broadcasting
sse.to('alerts').send('alert', {
  level: 'warning',
  message: 'High CPU usage',
  timestamp: Date.now()
} as AlertEvent)

// Typed authorization
const adminAuth: SSEAuthorizeCallback = async (ctx, params) => {
  const user = ctx.get('user')
  return user?.role === 'admin'
}

sse.channel('admin/logs', adminAuth)

Client-side types

import type {
  SSEClient,
  SSESubscription,
  SSEEventListener
} from '@strav/signal/sse'

// Typed event listeners
const alertListener: SSEEventListener<AlertEvent> = (data) => {
  console.log(`${data.level}: ${data.message}`)
}

const subscription = client.subscribe('alerts')
subscription.on('alert', alertListener)

// Typed subscription
interface TypedSubscription extends SSESubscription {
  onAlert(listener: SSEEventListener<AlertEvent>): () => void
  onMetrics(listener: SSEEventListener<MetricsEvent>): () => void
}

Real-World Examples

Live chat system

// Server: Chat channels
sse.channel('chat/:roomId', async (ctx, { roomId }) => {
  const user = ctx.get('user')
  const room = await Room.find(roomId)
  return room?.hasUser(user.id) ?? false
})

// Server: New message handler
async function sendChatMessage(roomId: string, message: ChatMessage) {
  sse.to(`chat/${roomId}`)
     .except(message.senderId)
     .send('message', {
       id: message.id,
       text: message.text,
       sender: message.sender.name,
       timestamp: message.createdAt
     })
}

// Client: Chat UI
const chat = client.subscribe(`chat/${roomId}`)
chat.on('message', (message) => {
  appendMessage(message)
  playNotificationSound()
})

chat.on('typing', ({ user }) => {
  showTypingIndicator(user)
})

Real-time dashboard

// Server: Metrics broadcasting
setInterval(() => {
  const metrics = collectSystemMetrics()

  sse.to('dashboard/metrics').data({
    timestamp: Date.now(),
    cpu: metrics.cpu,
    memory: metrics.memory,
    activeUsers: metrics.activeUsers,
    requestsPerSecond: metrics.rps
  })
}, 5000)

// Client: Dashboard updates
const dashboard = client.subscribe('dashboard/metrics')
dashboard.on('message', (metrics) => {
  updateChart('cpu', metrics.cpu)
  updateChart('memory', metrics.memory)
  updateCounter('users', metrics.activeUsers)
  updateCounter('rps', metrics.requestsPerSecond)
})

File upload progress

// Server: Upload progress endpoint
export async function uploadWithProgress(ctx: Context) {
  const uploadId = ctx.params.uploadId

  const stream = createProgressStream(async (update) => {
    const file = ctx.request.file
    let uploaded = 0
    const total = file.size

    // Simulate chunked upload
    while (uploaded < total) {
      const chunk = await file.read(1024)
      uploaded += chunk.length

      const percent = Math.round((uploaded / total) * 100)
      update(percent, `Uploaded ${uploaded}/${total} bytes`)

      await uploadChunk(uploadId, chunk)
    }
  })

  return ctx.sse(stream)
}

// Client: Progress tracking
const progress = client.subscribe(`upload/${uploadId}`)
progress.on('progress', ({ percent, message }) => {
  updateProgressBar(percent)
  updateStatusText(message)
})

progress.on('complete', () => {
  showSuccessMessage('Upload complete!')
  redirectToFileList()
})

progress.on('error', ({ message }) => {
  showErrorMessage(`Upload failed: ${message}`)
})

Best Practices

Channel naming

  • Use hierarchical names: notifications/users/123
  • Include resource type: projects/456/updates
  • Use parameters for dynamic channels: teams/:id/chat
  • Separate public and private: public/announcements, users/:id/private

Performance considerations

  • Limit channel subscriptions per client (typically < 10)
  • Use appropriate heartbeat intervals (30s for most cases)
  • Clean up subscriptions when components unmount
  • Consider message size and frequency for mobile clients

Error recovery

  • Implement client-side connection state UI
  • Handle authorization errors gracefully
  • Provide manual reconnection options
  • Log connection issues for debugging

Security

  • Always implement channel authorization for sensitive data
  • Validate user permissions in authorization callbacks
  • Use middleware for session/authentication checks
  • Sanitize data before broadcasting