Putnami
DocsGitHub

Licensed under FSL-1.1-MIT

Getting Started
Concepts
How To
Build A Web App
Build An Api Service
Share Code Between Projects
Configure Your App
Add Persistence
Add Authentication
Add Background Jobs
Develop With Ai
Structure Business Logic With Di
Upgrade Putnami
Principles
Tooling & Workspace
Workspace
Cli
Jobs & Caching
Extensions
Templates
Error Handling
Frameworks
Typescript
ExtensionOverviewWebReact RoutingForms And ActionsStatic FilesApiErrors And ResponsesConfigurationLoggingHttp And MiddlewareDependency InjectionPlugins And LifecycleSessionsAuthPersistenceEventsStorageCachingWebsocketsTestingHealth ChecksTelemetryProto GrpcSmart ClientSchema
Go
ExtensionOverviewHttpDependency InjectionPlugins And LifecycleConfigurationSecurityPersistenceErrorsEventsStorageCachingLoggingTelemetryGrpcService ClientsValidationOpenapiTesting
Python
Extension
Platform
Ci
  1. DocsSeparator
  2. FrameworksSeparator
  3. TypescriptSeparator
  4. Websockets

WebSockets & Streaming

@putnami/application includes WebSocket and SSE support built on Bun's native WebSocket server. Create real-time applications with typed, validated streaming endpoints.

Streaming with Stream()

Define real-time endpoints using the Stream() combinator with the endpoint() builder. Wrap a schema in Stream() to mark it as a stream of messages. The streaming mode is derived from which slots are streams:

body returns Mode Transport
T T Unary REST (standard HTTP)
T Stream(T) Server-stream SSE or WebSocket
Stream(T) T Client-stream WebSocket
Stream(T) Stream(T) Bidirectional WebSocket

Server-stream

Push events to the client. Supports both SSE and WebSocket via protocol negotiation.

// src/api/events/ws.ts
import { endpoint, Stream } from '@putnami/application';

export default endpoint()
  .returns(Stream({ event: String, payload: String }))
  .handle(async (ctx) => {
    ctx.send({ event: 'welcome', payload: 'hello' });
    // handler runs until it returns, then the connection closes
  });

Client-stream

Consume a stream of messages from the client, return a final result:

// src/api/upload/ws.ts
import { endpoint, Stream } from '@putnami/application';

export default endpoint()
  .body(Stream({ type: String, data: String }))
  .returns({ result: String })
  .handle(async (ctx) => {
    let count = 0;
    for await (const msg of ctx.messages()) {
      count++;
    }
    return { result: `processed ${count} messages` };
  });

Bidirectional

Send and receive concurrently:

// src/api/chat/[roomId]/ws.ts
import { endpoint, Stream, Uuid } from '@putnami/application';

export default endpoint()
  .params({ roomId: Uuid })
  .body(Stream({ type: String, data: String }))
  .returns(Stream({ event: String, payload: String }))
  .handle(async (ctx) => {
    ctx.send({ event: 'welcome', payload: ctx.params.roomId });
    for await (const msg of ctx.messages()) {
      ctx.send({ event: 'echo', payload: msg.data });
    }
  });

Handler context

The handler context adapts based on the streaming mode:

  • ctx.messages() — available when body is Stream(), returns AsyncIterable<T>. The loop ends when the client disconnects.
  • ctx.send(data) — available when returns is Stream(), pushes a validated message to the client.
  • ctx.params, ctx.queryParams() — validated on connection open, same as HTTP endpoints.

Schema validation applies to all stream messages automatically. Invalid messages are rejected before reaching the handler.

Protocol negotiation (server-stream)

Server-stream endpoints support two transports:

WebSocket — the client sends Upgrade: websocket:

const ws = new WebSocket('ws://localhost:3000/events');
ws.onmessage = (e) => console.log(JSON.parse(e.data));

SSE — the client sends Accept: text/event-stream:

const source = new EventSource('http://localhost:3000/events');
source.onmessage = (e) => console.log(JSON.parse(e.data));

Client-stream and bidirectional endpoints are WebSocket-only.

Client-side connection

Basic client

const ws = new WebSocket('ws://localhost:3000/api/chat/room');

ws.onopen = () => {
  console.log('Connected');
  ws.send(JSON.stringify({ type: 'hello' }));
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Received:', data);
};

ws.onclose = (event) => {
  console.log('Disconnected:', event.code, event.reason);
};

ws.onerror = (error) => {
  console.error('Error:', error);
};

SSE client (server-stream)

const source = new EventSource('http://localhost:3000/api/events');

source.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Event:', data);
};

source.onerror = () => {
  console.log('Connection lost — EventSource will auto-reconnect');
};

React hook

import { useEffect, useState, useCallback, useRef } from 'react';

function useWebSocket(url: string) {
  const [isConnected, setIsConnected] = useState(false);
  const [lastMessage, setLastMessage] = useState<unknown>(null);
  const wsRef = useRef<WebSocket | null>(null);

  useEffect(() => {
    const ws = new WebSocket(url);
    wsRef.current = ws;

    ws.onopen = () => setIsConnected(true);
    ws.onclose = () => setIsConnected(false);
    ws.onmessage = (event) => {
      setLastMessage(JSON.parse(event.data));
    };

    return () => {
      ws.close();
    };
  }, [url]);

  const send = useCallback((data: unknown) => {
    wsRef.current?.send(JSON.stringify(data));
  }, []);

  return { isConnected, lastMessage, send };
}

// Usage
function ChatComponent() {
  const { isConnected, lastMessage, send } = useWebSocket('ws://localhost:3000/ws/chat');

  return (
    <div>
      <p>Status: {isConnected ? 'Connected' : 'Disconnected'}</p>
      <button onClick={() => send({ type: 'ping' })}>
        Send Ping
      </button>
    </div>
  );
}

Reconnection logic

function createReconnectingWebSocket(url: string, options = { maxRetries: 5 }) {
  let ws: WebSocket | null = null;
  let retries = 0;

  const connect = () => {
    ws = new WebSocket(url);

    ws.onopen = () => {
      retries = 0;
      console.log('Connected');
    };

    ws.onclose = () => {
      if (retries < options.maxRetries) {
        retries++;
        const delay = Math.min(1000 * Math.pow(2, retries), 30000);
        console.log(`Reconnecting in ${delay}ms...`);
        setTimeout(connect, delay);
      }
    };
  };

  connect();

  return {
    send: (data: unknown) => ws?.send(JSON.stringify(data)),
    close: () => ws?.close(),
  };
}

Best practices

  1. Use Stream() for typed endpoints — schema validation catches issues early
  2. Prefer SSE for server-push — simpler, works through proxies, auto-reconnects
  3. Handle errors gracefully — always handle connection errors
  4. Authenticate connections — verify user identity before allowing connections
  5. Limit message size — prevent abuse with large messages
  6. Clean up resources — the for await loop ends automatically on disconnect

Related guides

  • API
  • HTTP & Middleware

On this page

  • WebSockets & Streaming
  • Streaming with Stream()
  • Server-stream
  • Client-stream
  • Bidirectional
  • Handler context
  • Protocol negotiation (server-stream)
  • Client-side connection
  • Basic client
  • SSE client (server-stream)
  • React hook
  • Reconnection logic
  • Best practices
  • Related guides