Events
@putnami/events provides typed, transport-agnostic event messaging for building event-driven applications.
Overview
The module has three core concepts:
- Topics — typed contracts (name + schema) shared between publishers and subscribers
- Handlers — subscribe to topics with configurable retries, distribution, and filtering
- Publishers — schema-validated publish functions bound to a topic
Quick Example
// shared/topics.ts
import { topic, Uuid, Email } from '@putnami/events';
export const UserCreated = topic('user.created', {
id: Uuid,
email: Email,
name: String,
});// events/user-created.on.ts
import { handler } from '@putnami/events';
import { UserCreated } from '../shared/topics';
export default handler(UserCreated)
.options({ distribution: 'broadcast' })
.handle(async (msg) => {
console.log(`Welcome ${msg.payload.name}!`);
});// publish.ts
import { getPublisher } from '@putnami/events';
import { UserCreated } from '../shared/topics';
const publish = getPublisher(UserCreated);
await publish({ id: crypto.randomUUID(), email: 'jane@example.com', name: 'Jane' });// app.ts
import { application } from '@putnami/application';
import { events } from '@putnami/events';
const app = application()
.use(events()); // Auto-discovers events/*.on.ts handlers
await app.start();Topics
Define typed topics using the same schema system as the rest of the framework:
import { topic, Uuid, Int, Optional, ArrayOf } from '@putnami/events';
export const OrderPlaced = topic('order.placed', {
orderId: Uuid,
userId: Uuid,
items: ArrayOf({ productId: Uuid, quantity: Int }),
discount: Optional(Number),
total: Number,
});Topics are pure data — import them in any service for full type safety on both the publisher and subscriber side.
Handlers
Handler DI (.inject())
Event handlers support the same typed DI pattern as HTTP handlers. Declare dependencies with .inject() and receive them as the first argument in .handle().
import { handler } from '@putnami/events';
import { UserCreated } from './topics';
import { UserRepository, WelcomeMailer } from './services';
export default handler(UserCreated)
.inject({ users: UserRepository, mailer: WelcomeMailer })
.handle(async ({ users, mailer }, msg) => {
const user = await users.findById(msg.payload.id);
if (user) {
await mailer.send(user.email);
}
});When running via Application + events(), each handler invocation gets its own DI scope automatically.
Options
| Option | Default | Description |
|---|---|---|
distribution |
'competing' |
'competing' round-robins across instances; 'broadcast' delivers to all |
maxRetries |
10 |
Retry attempts before DLQ |
maxBackoff |
60000 |
Max retry delay in ms |
timeout |
30000 |
Handler execution timeout in ms |
dlq |
true |
Send to dead-letter queue after max retries. When false, failed messages are logged and discarded. |
ack |
'auto' |
'auto': return = ack, throw = nack. 'manual': call msg.ack() |
Attribute Filtering
handler(OrderPlaced)
.filter({ attributes: { region: 'eu' } })
.handle(async (msg) => { ... });Acknowledgement
By default, returning from a handler acknowledges the message. Throwing triggers retry with exponential backoff (1s, 2s, 4s, ... capped at maxBackoff).
For advanced cases, use manual ack:
handler(UserCreated)
.options({ ack: 'manual' })
.handle(async (msg) => {
await longProcess(msg.payload);
msg.ack();
});Publishing
const publish = getPublisher(UserCreated);
await publish(
{ id: '...', email: 'jane@example.com', name: 'Jane' },
{ attributes: { region: 'eu' }, traceId: 'custom-trace' },
);Payloads are validated against the topic schema before publishing. Invalid payloads throw immediately.
Context Propagation
When publishing from inside an HTTP handler or any runInContext scope, the publisher automatically captures:
traceId— inherited from the current async context, preserving end-to-end trace continuity across the HTTP request → event boundary. An explicittraceIdin options takes precedence.- Auth claims — if the context contains a
user(set by the OAuth middleware), the claimssub,email,azp, andclient_idare auto-captured asauth.*message attributes.
// Inside an authenticated HTTP handler — no extra options needed
app.post('/users', async (ctx) => {
const user = await createUser(ctx);
const publish = getPublisher(UserCreated);
await publish({ id: user.id, email: user.email, name: user.name });
// envelope.traceId === request traceId
// envelope.attributes['auth.sub'] === ctx.user.sub
});Handlers can read these on the other side:
handler(UserCreated).handle(async (msg) => {
const who = msg.attributes['auth.sub'];
const traceId = msg.traceId; // same as HTTP request
});Explicit attributes in PublishOptions take precedence over auto-captured auth claims.
Handler Auto-Discovery
The plugin automatically discovers handler files with the .on.ts suffix in a sibling events/ directory:
src/
├── main.ts ← events() called here
└── events/
├── user-created.on.ts → export default handler(UserCreated).handle(...)
├── order-placed.on.ts → export default handler(OrderPlaced).handle(...)
└── payment/
└── failed.on.ts → export default handler(PaymentFailed).handle(...)- Suffix:
*.on.tsidentifies a file as an event handler - Exports: default and named exports are scanned for
HandlerDefinitionobjects - Nesting: subdirectories are supported for organization (path has no semantic meaning)
- Build-time: discovery happens during the
generatephase, producing a static loader
Multiple handlers can be exported from a single file:
// events/order.on.ts
export const onPlaced = handler(OrderPlaced).handle(async (msg) => { ... });
export const onShipped = handler(OrderShipped).handle(async (msg) => { ... });Plugin Setup
import { events } from '@putnami/events';
// Auto-discovery (default) — scans events/*.on.ts
const app = application()
.use(events());
// Explicit handlers (no scanning)
const app = application()
.use(events({ autoScan: false, handlers: [handler1, handler2] }));
// Custom scan directory
const app = application()
.use(events({ scanFolder: 'subscribers' }));In local development (no EVENTS_ENDPOINT env var), the plugin starts an in-memory broker with a local HTTP server on port 4222. Multiple services can share the same local server for cross-service event exchange. The endpoint config option takes precedence over the env var.
Observability
Async Context
Every handler runs inside runInContext(). The event context is available via useContext() anywhere in the call chain:
import { useContext } from '@putnami/runtime';
import type { EventContext } from '@putnami/events';
handler(UserCreated).handle(async (msg) => {
const ctx = useContext<EventContext>();
// ctx.traceId, ctx.eventTopic, ctx.eventMessageId, ctx.eventAttempt
});useLogger() automatically prefixes log lines with the trace ID, and DI services resolved inside the handler (including .inject() dependencies) share the same async context.
Telemetry Metrics
When the telemetry plugin is active, these metrics are emitted automatically:
| Metric | Type | Description |
|---|---|---|
events.handle.{topic}.success |
Counter | Successful handler invocations |
events.handle.{topic}.failure |
Counter | Failed handler invocations |
events.handle.{topic}.duration |
Histogram | Handler execution time (ms) |
events.handle.{topic}.retry |
Counter | Retry attempts |
events.handle.{topic}.dlq |
Counter | Messages sent to dead-letter queue |
events.publish.{topic} |
Counter | Successfully published messages |
events.publish.{topic}.error |
Counter | Publish validation errors |
Structured Logging
The broker logs the full message lifecycle with trace ID prefixes:
[trace-id] Received message abc-123 on 'user.created' (attempt 1)
[trace-id] Handled message abc-123 on 'user.created' in 12msDesign Principles
- Idempotent handlers — The in-memory broker deliberately redelivers ~2% of messages in dev mode (
NODE_ENV !== 'production'). Design handlers to tolerate duplicates. - Conservative defaults — 10 retries with exponential backoff and DLQ enabled by default.
- Transport agnostic — The API is decoupled from the transport. Switch between in-memory (dev) and cloud (production) without code changes.
- Convention over configuration — Place
*.on.tsfiles inevents/and they are discovered automatically. Same pattern asapi()route scanning. - Context per message — Each handler invocation gets its own
runInContextwith trace ID, topic, and message metadata.
See the package documentation for the full API reference.