Putnami
DocsGitHub

Licensed under FSL-1.1-MIT

Getting Started
Concepts
How To
Build A Web App
Build An Api Service
Share Code Between Projects
Configure Your App
Add Persistence
Add Authentication
Add Background Jobs
Principles
Tooling & Workspace
Workspace Overview
Cli
Jobs & Commands
SDK
Error Handling
Extensions
Typescript
Go
Python
Docker
Ci
Frameworks
Typescript
OverviewWebReact RoutingForms And ActionsStatic FilesApiErrors And ResponsesConfigurationLoggingHttp And MiddlewareDependency InjectionPlugins And LifecycleSessionsAuthPersistenceEventsStorageCachingWebsocketsTestingHealth ChecksTelemetryProto GrpcSmart Client
Go
OverviewHttpDependency InjectionPlugins And LifecycleConfigurationSecurityPersistenceErrorsEventsStorageCachingLoggingTelemetryGrpcService ClientsValidationOpenapiTesting
Platform
  1. DocsSeparator
  2. FrameworksSeparator
  3. GoSeparator
  4. Events

Events

go.putnami.dev/events provides a typed, transport-agnostic event system for building event-driven services.

Overview

The package has three core concepts:

  • Topics — typed contracts shared between publishers and subscribers
  • Handlers — subscribe to topics with retries, distribution modes, and filtering
  • Publishers — schema-validated publish functions bound to a topic

Quick example

import (
    "context"
    "fmt"

    "go.putnami.dev/events"
)

// Define a typed topic
var UserCreated = events.NewTopic[UserPayload]("user.created")

type UserPayload struct {
    ID    string `json:"id"`
    Email string `json:"email"`
    Name  string `json:"name"`
}

// Subscribe
handler := events.Handle(UserCreated, func(ctx context.Context, msg *events.Message[UserPayload]) error {
    fmt.Printf("Welcome %s!\n", msg.Payload.Name)
    return nil
}, events.WithDistribution(events.Broadcast))

// Publish
broker := events.NewMemoryBroker(events.MemoryBrokerConfig{})
broker.Subscribe(handler)
broker.Start(ctx)

publisher := events.NewPublisher(UserCreated, broker)
publisher.Publish(ctx, UserPayload{ID: "123", Email: "jane@example.com", Name: "Jane"})

Topics

Topics are pure data contracts that carry no transport logic:

import "go.putnami.dev/events"

var OrderPlaced = events.NewTopic[OrderPayload]("order.placed")
var PaymentFailed = events.NewTopic[PaymentPayload]("payment.failed")

type OrderPayload struct {
    OrderID string  `json:"orderId"`
    UserID  string  `json:"userId"`
    Total   float64 `json:"total"`
}

type PaymentPayload struct {
    OrderID string `json:"orderId"`
    Reason  string `json:"reason"`
}

Topics can be imported from any package for full type safety on both publisher and subscriber sides.

Handlers

Basic handler

handler := events.Handle(UserCreated, func(ctx context.Context, msg *events.Message[UserPayload]) error {
    user := msg.Payload
    fmt.Printf("User created: %s (%s)\n", user.Name, user.Email)
    return nil // ack
})

Handler options

handler := events.Handle(OrderPlaced,
    func(ctx context.Context, msg *events.Message[OrderPayload]) error {
        return processOrder(ctx, msg.Payload)
    },
    events.WithDistribution(events.Competing),
    events.WithMaxRetries(5),
    events.WithMaxBackoff(30 * time.Second),
    events.WithTimeout(10 * time.Second),
    events.WithDLQ(true),
)
Option Default Description
WithDistribution(mode) Competing Competing round-robins across instances; Broadcast delivers to all
WithMaxRetries(n) 10 Retry attempts before DLQ
WithMaxBackoff(d) 60s Maximum retry delay
WithTimeout(d) 30s Handler execution timeout
WithDLQ(bool) true Send to dead-letter queue after max retries
WithAckMode(mode) AutoAck AutoAck: return = ack, error = nack. ManualAck: call msg.Ack()
WithFilter(attrs) — Filter by message attributes

Distribution modes

Competing — round-robins messages across instances. Each message is processed by exactly one handler. Use for work distribution:

events.WithDistribution(events.Competing)

Broadcast — delivers every message to all subscribed handlers. Use for notifications, cache invalidation, or auditing:

events.WithDistribution(events.Broadcast)

Attribute filtering

Subscribe to a subset of messages:

handler := events.Handle(OrderPlaced,
    processEUOrders,
    events.WithFilter(map[string]string{"region": "eu"}),
)

Retry and acknowledgement

By default, returning nil acknowledges the message. Returning an error triggers retry with exponential backoff (1s, 2s, 4s, ... capped at maxBackoff):

handler := events.Handle(OrderPlaced, func(ctx context.Context, msg *events.Message[OrderPayload]) error {
    if err := processOrder(ctx, msg.Payload); err != nil {
        return err // triggers retry
    }
    return nil // message acknowledged
})

For manual acknowledgement:

handler := events.Handle(UserCreated,
    func(ctx context.Context, msg *events.Message[UserPayload]) error {
        go asyncProcess(msg.Payload)
        msg.Ack()
        return nil
    },
    events.WithAckMode(events.ManualAck),
)

Publishing

Basic publish

publisher := events.NewPublisher(UserCreated, broker)

err := publisher.Publish(ctx, UserPayload{
    ID:    "user-123",
    Email: "jane@example.com",
    Name:  "Jane",
})

Publish with options

err := publisher.Publish(ctx, payload,
    events.WithAttributes(map[string]string{
        "region":  "eu",
        "source":  "api",
    }),
    events.WithTraceID("trace-abc-123"),
)

Message

Handlers receive a typed Message[T]:

type Message[T any] struct {
    ID         string
    Topic      string
    Payload    T
    Timestamp  time.Time
    Attributes map[string]string
    Attempt    int
    TraceID    string
}

Memory broker

The in-memory broker handles local development and single-process services:

broker := events.NewMemoryBroker(events.MemoryBrokerConfig{})

// Subscribe handlers
broker.Subscribe(handler1)
broker.Subscribe(handler2)

// Start processing
broker.Start(ctx)

// Publish
publisher := events.NewPublisher(UserCreated, broker)
publisher.Publish(ctx, payload)

// Graceful shutdown
broker.Stop(ctx)

The broker supports:

  • Competing and broadcast distribution
  • Exponential backoff retry
  • Dead-letter queues
  • Attribute-based filtering

Transport interface

The event system is transport-agnostic. Implement the Transport interface for custom backends:

type Transport interface {
    Publish(ctx context.Context, env Envelope) error
    Subscribe(def *HandlerDefinition) error
    Start(ctx context.Context) error
    Stop(ctx context.Context) error
}

Switch between in-memory (development) and cloud (production) without code changes.

Design principles

  • Idempotent handlers — design handlers to tolerate duplicate delivery
  • Conservative defaults — 10 retries with exponential backoff and DLQ enabled
  • Transport agnostic — switch transports without changing application code
  • Type safety — generics ensure compile-time type checking for payloads

Related guides

  • Plugins & Lifecycle — event broker lifecycle
  • Telemetry — event metrics
  • TypeScript events — TypeScript equivalent

On this page

  • Events
  • Overview
  • Quick example
  • Topics
  • Handlers
  • Basic handler
  • Handler options
  • Distribution modes
  • Attribute filtering
  • Retry and acknowledgement
  • Publishing
  • Basic publish
  • Publish with options
  • Message
  • Memory broker
  • Transport interface
  • Design principles
  • Related guides