Putnami
DocsGitHub

Licensed under FSL-1.1-MIT

Getting Started
Concepts
How To
Build A Web App
Build An Api Service
Share Code Between Projects
Configure Your App
Add Persistence
Add Authentication
Add Background Jobs
Develop With Ai
Structure Business Logic With Di
Upgrade Putnami
Principles
Tooling & Workspace
Workspace
Cli
Jobs & Caching
Extensions
Templates
Error Handling
Frameworks
Typescript
ExtensionOverviewWebReact RoutingForms And ActionsStatic FilesApiErrors And ResponsesConfigurationLoggingHttp And MiddlewareDependency InjectionPlugins And LifecycleSessionsAuthPersistenceEventsStorageCachingWebsocketsTestingHealth ChecksTelemetryProto GrpcSmart ClientSchema
Go
ExtensionOverviewHttpDependency InjectionPlugins And LifecycleConfigurationSecurityPersistenceErrorsEventsStorageCachingLoggingTelemetryGrpcService ClientsValidationOpenapiTesting
Python
Extension
Platform
Ci
  1. DocsSeparator
  2. FrameworksSeparator
  3. GoSeparator
  4. Events

Events

go.putnami.dev/events provides a typed, transport-agnostic event system for building event-driven services.

Overview

The package has three core concepts:

  • Topics — typed contracts shared between publishers and subscribers
  • Handlers — subscribe to topics with retries, distribution modes, and filtering
  • Publishers — schema-validated publish functions bound to a topic

Quick example

import (
    "context"
    "fmt"

    "go.putnami.dev/events"
)

// Define a typed topic
var UserCreated = events.NewTopic[UserPayload]("user.created")

type UserPayload struct {
    ID    string `json:"id"`
    Email string `json:"email"`
    Name  string `json:"name"`
}

// Subscribe
handler := events.Handle(UserCreated, func(ctx context.Context, msg *events.Message[UserPayload]) error {
    fmt.Printf("Welcome %s!\n", msg.Payload.Name)
    return nil
}, events.WithDistribution(events.Broadcast))

// Publish
broker := events.NewMemoryBroker(events.MemoryBrokerConfig{})
broker.Subscribe(handler)
broker.Start(ctx)

publisher := events.NewPublisher(UserCreated, broker)
publisher.Publish(ctx, UserPayload{ID: "123", Email: "jane@example.com", Name: "Jane"})

Topics

Topics are pure data contracts that carry no transport logic:

import "go.putnami.dev/events"

var OrderPlaced = events.NewTopic[OrderPayload]("order.placed")
var PaymentFailed = events.NewTopic[PaymentPayload]("payment.failed")

type OrderPayload struct {
    OrderID string  `json:"orderId"`
    UserID  string  `json:"userId"`
    Total   float64 `json:"total"`
}

type PaymentPayload struct {
    OrderID string `json:"orderId"`
    Reason  string `json:"reason"`
}

Topics can be imported from any package for full type safety on both publisher and subscriber sides.

Handlers

Basic handler

handler := events.Handle(UserCreated, func(ctx context.Context, msg *events.Message[UserPayload]) error {
    user := msg.Payload
    fmt.Printf("User created: %s (%s)\n", user.Name, user.Email)
    return nil // ack
})

Handler options

handler := events.Handle(OrderPlaced,
    func(ctx context.Context, msg *events.Message[OrderPayload]) error {
        return processOrder(ctx, msg.Payload)
    },
    events.WithDistribution(events.Competing),
    events.WithMaxRetries(5),
    events.WithMaxBackoff(30 * time.Second),
    events.WithTimeout(10 * time.Second),
    events.WithDLQ(true),
)
Option Default Description
WithDistribution(mode) Competing Competing round-robins across instances; Broadcast delivers to all
WithMaxRetries(n) 10 Retry attempts before DLQ
WithMaxBackoff(d) 60s Maximum retry delay
WithTimeout(d) 30s Handler execution timeout
WithDLQ(bool) true Send to dead-letter queue after max retries
WithAckMode(mode) AutoAck AutoAck: return = ack, error = nack. ManualAck: call msg.Ack()
WithFilter(attrs) — Filter by message attributes

Distribution modes

Competing — round-robins messages across instances. Each message is processed by exactly one handler. Use for work distribution:

events.WithDistribution(events.Competing)

Broadcast — delivers every message to all subscribed handlers. Use for notifications, cache invalidation, or auditing:

events.WithDistribution(events.Broadcast)

Attribute filtering

Subscribe to a subset of messages:

handler := events.Handle(OrderPlaced,
    processEUOrders,
    events.WithFilter(map[string]string{"region": "eu"}),
)

Retry and acknowledgement

By default, returning nil acknowledges the message. Returning an error triggers retry with exponential backoff (1s, 2s, 4s, ... capped at maxBackoff):

handler := events.Handle(OrderPlaced, func(ctx context.Context, msg *events.Message[OrderPayload]) error {
    if err := processOrder(ctx, msg.Payload); err != nil {
        return err // triggers retry
    }
    return nil // message acknowledged
})

For manual acknowledgement:

handler := events.Handle(UserCreated,
    func(ctx context.Context, msg *events.Message[UserPayload]) error {
        go asyncProcess(msg.Payload)
        msg.Ack()
        return nil
    },
    events.WithAckMode(events.ManualAck),
)

Publishing

Basic publish

publisher := events.NewPublisher(UserCreated, broker)

err := publisher.Publish(ctx, UserPayload{
    ID:    "user-123",
    Email: "jane@example.com",
    Name:  "Jane",
})

Publish with options

err := publisher.Publish(ctx, payload,
    events.WithAttributes(map[string]string{
        "region":  "eu",
        "source":  "api",
    }),
    events.WithTraceID("trace-abc-123"),
)

Message

Handlers receive a typed Message[T]:

type Message[T any] struct {
    ID         string
    Topic      string
    Payload    T
    Timestamp  time.Time
    Attributes map[string]string
    Attempt    int
    TraceID    string
}

Memory broker

The in-memory broker handles local development and single-process services:

broker := events.NewMemoryBroker(events.MemoryBrokerConfig{})

// Subscribe handlers
broker.Subscribe(handler1)
broker.Subscribe(handler2)

// Start processing
broker.Start(ctx)

// Publish
publisher := events.NewPublisher(UserCreated, broker)
publisher.Publish(ctx, payload)

// Graceful shutdown
broker.Stop(ctx)

The broker supports:

  • Competing and broadcast distribution
  • Exponential backoff retry
  • Dead-letter queues
  • Attribute-based filtering

Transport interface

The event system is transport-agnostic. Implement the Transport interface for custom backends:

type Transport interface {
    Publish(ctx context.Context, env Envelope) error
    Subscribe(def *HandlerDefinition) error
    Start(ctx context.Context) error
    Stop(ctx context.Context) error
}

Switch between in-memory (development) and cloud (production) without code changes.

Design principles

  • Idempotent handlers — design handlers to tolerate duplicate delivery
  • Conservative defaults — 10 retries with exponential backoff and DLQ enabled
  • Transport agnostic — switch transports without changing application code
  • Type safety — generics ensure compile-time type checking for payloads

Related guides

  • Plugins & Lifecycle — event broker lifecycle
  • Telemetry — event metrics
  • TypeScript events — TypeScript equivalent

On this page

  • Events
  • Overview
  • Quick example
  • Topics
  • Handlers
  • Basic handler
  • Handler options
  • Distribution modes
  • Attribute filtering
  • Retry and acknowledgement
  • Publishing
  • Basic publish
  • Publish with options
  • Message
  • Memory broker
  • Transport interface
  • Design principles
  • Related guides