Putnami
DocsGitHub

Licensed under FSL-1.1-MIT

Getting Started
Concepts
How To
Build A Web App
Build An Api Service
Share Code Between Projects
Configure Your App
Add Persistence
Add Authentication
Add Background Jobs
Principles
Tooling & Workspace
Workspace Overview
Cli
Jobs & Commands
SDK
Error Handling
Extensions
Typescript
Go
Python
Docker
Ci
Frameworks
Typescript
OverviewWebReact RoutingForms And ActionsStatic FilesApiErrors And ResponsesConfigurationLoggingHttp And MiddlewareDependency InjectionPlugins And LifecycleSessionsAuthPersistenceEventsStorageCachingWebsocketsTestingHealth ChecksTelemetryProto GrpcSmart Client
Go
OverviewHttpDependency InjectionPlugins And LifecycleConfigurationSecurityPersistenceErrorsEventsStorageCachingLoggingTelemetryGrpcService ClientsValidationOpenapiTesting
Platform
  1. DocsSeparator
  2. FrameworksSeparator
  3. TypescriptSeparator
  4. Events

Events

@putnami/events provides typed, transport-agnostic event messaging for building event-driven applications.

Overview

The module has three core concepts:

  • Topics — typed contracts (name + schema) shared between publishers and subscribers
  • Handlers — subscribe to topics with configurable retries, distribution, and filtering
  • Publishers — schema-validated publish functions bound to a topic

Quick Example

// shared/topics.ts
import { topic, Uuid, Email } from '@putnami/events';

export const UserCreated = topic('user.created', {
  id: Uuid,
  email: Email,
  name: String,
});
// events/user-created.on.ts
import { handler } from '@putnami/events';
import { UserCreated } from '../shared/topics';

export default handler(UserCreated)
  .options({ distribution: 'broadcast' })
  .handle(async (msg) => {
    console.log(`Welcome ${msg.payload.name}!`);
  });
// publish.ts
import { getPublisher } from '@putnami/events';
import { UserCreated } from '../shared/topics';

const publish = getPublisher(UserCreated);
await publish({ id: crypto.randomUUID(), email: 'jane@example.com', name: 'Jane' });
// app.ts
import { application } from '@putnami/application';
import { events } from '@putnami/events';

const app = application()
  .use(events()); // Auto-discovers events/*.on.ts handlers

await app.start();

Topics

Define typed topics using the same schema system as the rest of the framework:

import { topic, Uuid, Int, Optional, ArrayOf } from '@putnami/events';

export const OrderPlaced = topic('order.placed', {
  orderId: Uuid,
  userId: Uuid,
  items: ArrayOf({ productId: Uuid, quantity: Int }),
  discount: Optional(Number),
  total: Number,
});

Topics are pure data — import them in any service for full type safety on both the publisher and subscriber side.

Handlers

Handler DI (.inject())

Event handlers support the same typed DI pattern as HTTP handlers. Declare dependencies with .inject() and receive them as the first argument in .handle().

import { handler } from '@putnami/events';
import { UserCreated } from './topics';
import { UserRepository, WelcomeMailer } from './services';

export default handler(UserCreated)
  .inject({ users: UserRepository, mailer: WelcomeMailer })
  .handle(async ({ users, mailer }, msg) => {
    const user = await users.findById(msg.payload.id);
    if (user) {
      await mailer.send(user.email);
    }
  });

When running via Application + events(), each handler invocation gets its own DI scope automatically.

Options

Option Default Description
distribution 'competing' 'competing' round-robins across instances; 'broadcast' delivers to all
maxRetries 10 Retry attempts before DLQ
maxBackoff 60000 Max retry delay in ms
timeout 30000 Handler execution timeout in ms
dlq true Send to dead-letter queue after max retries. When false, failed messages are logged and discarded.
ack 'auto' 'auto': return = ack, throw = nack. 'manual': call msg.ack()

Attribute Filtering

handler(OrderPlaced)
  .filter({ attributes: { region: 'eu' } })
  .handle(async (msg) => { ... });

Acknowledgement

By default, returning from a handler acknowledges the message. Throwing triggers retry with exponential backoff (1s, 2s, 4s, ... capped at maxBackoff).

For advanced cases, use manual ack:

handler(UserCreated)
  .options({ ack: 'manual' })
  .handle(async (msg) => {
    await longProcess(msg.payload);
    msg.ack();
  });

Publishing

const publish = getPublisher(UserCreated);

await publish(
  { id: '...', email: 'jane@example.com', name: 'Jane' },
  { attributes: { region: 'eu' }, traceId: 'custom-trace' },
);

Payloads are validated against the topic schema before publishing. Invalid payloads throw immediately.

Context Propagation

When publishing from inside an HTTP handler or any runInContext scope, the publisher automatically captures:

  • traceId — inherited from the current async context, preserving end-to-end trace continuity across the HTTP request → event boundary. An explicit traceId in options takes precedence.
  • Auth claims — if the context contains a user (set by the OAuth middleware), the claims sub, email, azp, and client_id are auto-captured as auth.* message attributes.
// Inside an authenticated HTTP handler — no extra options needed
app.post('/users', async (ctx) => {
  const user = await createUser(ctx);
  const publish = getPublisher(UserCreated);
  await publish({ id: user.id, email: user.email, name: user.name });
  // envelope.traceId === request traceId
  // envelope.attributes['auth.sub'] === ctx.user.sub
});

Handlers can read these on the other side:

handler(UserCreated).handle(async (msg) => {
  const who = msg.attributes['auth.sub'];
  const traceId = msg.traceId; // same as HTTP request
});

Explicit attributes in PublishOptions take precedence over auto-captured auth claims.

Handler Auto-Discovery

The plugin automatically discovers handler files with the .on.ts suffix in a sibling events/ directory:

src/
├── main.ts                        ← events() called here
└── events/
    ├── user-created.on.ts         → export default handler(UserCreated).handle(...)
    ├── order-placed.on.ts         → export default handler(OrderPlaced).handle(...)
    └── payment/
        └── failed.on.ts           → export default handler(PaymentFailed).handle(...)
  • Suffix: *.on.ts identifies a file as an event handler
  • Exports: default and named exports are scanned for HandlerDefinition objects
  • Nesting: subdirectories are supported for organization (path has no semantic meaning)
  • Build-time: discovery happens during the generate phase, producing a static loader

Multiple handlers can be exported from a single file:

// events/order.on.ts
export const onPlaced = handler(OrderPlaced).handle(async (msg) => { ... });
export const onShipped = handler(OrderShipped).handle(async (msg) => { ... });

Plugin Setup

import { events } from '@putnami/events';

// Auto-discovery (default) — scans events/*.on.ts
const app = application()
  .use(events());

// Explicit handlers (no scanning)
const app = application()
  .use(events({ autoScan: false, handlers: [handler1, handler2] }));

// Custom scan directory
const app = application()
  .use(events({ scanFolder: 'subscribers' }));

In local development (no EVENTS_ENDPOINT env var), the plugin starts an in-memory broker with a local HTTP server on port 4222. Multiple services can share the same local server for cross-service event exchange. The endpoint config option takes precedence over the env var.

Observability

Async Context

Every handler runs inside runInContext(). The event context is available via useContext() anywhere in the call chain:

import { useContext } from '@putnami/runtime';
import type { EventContext } from '@putnami/events';

handler(UserCreated).handle(async (msg) => {
  const ctx = useContext<EventContext>();
  // ctx.traceId, ctx.eventTopic, ctx.eventMessageId, ctx.eventAttempt
});

useLogger() automatically prefixes log lines with the trace ID, and DI services resolved inside the handler (including .inject() dependencies) share the same async context.

Telemetry Metrics

When the telemetry plugin is active, these metrics are emitted automatically:

Metric Type Description
events.handle.{topic}.success Counter Successful handler invocations
events.handle.{topic}.failure Counter Failed handler invocations
events.handle.{topic}.duration Histogram Handler execution time (ms)
events.handle.{topic}.retry Counter Retry attempts
events.handle.{topic}.dlq Counter Messages sent to dead-letter queue
events.publish.{topic} Counter Successfully published messages
events.publish.{topic}.error Counter Publish validation errors

Structured Logging

The broker logs the full message lifecycle with trace ID prefixes:

[trace-id] Received message abc-123 on 'user.created' (attempt 1)
[trace-id] Handled message abc-123 on 'user.created' in 12ms

Design Principles

  • Idempotent handlers — The in-memory broker deliberately redelivers ~2% of messages in dev mode (NODE_ENV !== 'production'). Design handlers to tolerate duplicates.
  • Conservative defaults — 10 retries with exponential backoff and DLQ enabled by default.
  • Transport agnostic — The API is decoupled from the transport. Switch between in-memory (dev) and cloud (production) without code changes.
  • Convention over configuration — Place *.on.ts files in events/ and they are discovered automatically. Same pattern as api() route scanning.
  • Context per message — Each handler invocation gets its own runInContext with trace ID, topic, and message metadata.

See the package documentation for the full API reference.

On this page

  • Events
  • Overview
  • Quick Example
  • Topics
  • Handlers
  • Handler DI (.inject())
  • Options
  • Attribute Filtering
  • Acknowledgement
  • Publishing
  • Context Propagation
  • Handler Auto-Discovery
  • Plugin Setup
  • Observability
  • Async Context
  • Telemetry Metrics
  • Structured Logging
  • Design Principles