Emissions

Event emission library for Elixir with pluggable adapters, per-request buffering, and a GenStage-powered delivery pipeline.

Battle-tested with millions of events in production at Stord and Hiive.

{:emissions, "~> 0.1.0"}

Why Emissions?

Everything you need for structured event delivery in Elixir

Per-Request Buffering

Collect events during a request or transaction and commit them atomically. No events leak on failure.

Pluggable Adapters

Built-in Phoenix Channels adapter for real-time WebSocket delivery, plus a simple behaviour for custom adapters.

GenStage Pipeline

Fan-out to multiple adapters with independent backpressure per consumer. Built on GenStage.

Event Handlers

Subscribe to events with supervised task execution and cascading support for internal workflows.

Topic Routing

Compile-time event-to-topic mapping with multi-tenant scoping. Isolate WebSocket topics per tenant with a single config option.

Telemetry & Testing

Built-in telemetry events for monitoring, plus test helpers for capturing and asserting on emitted events.

Quick Start

Get up and running in minutes

1

Configure your adapters and handlers

# config/config.exs
config :emissions,
  adapters: [
    {Emissions.Phoenix.PubSubAdapter,
     pubsub: MyApp.PubSub,
     scope: :org_id},            # multi-tenant topic scoping
    {MyApp.KafkaAdapter, topic: "my-events"}
  ],
  handlers: [
    MyApp.SearchIndexHandler,
    MyApp.NotificationHandler
  ]
2

Add the Plug to your Phoenix pipeline

# router.ex — auto-manages buffer lifecycle per request
pipeline :api do
  plug :accepts, ["json"]
  plug Emissions.Plug
end

The plug starts a buffer for mutating requests, commits on 2xx responses, and terminates on errors. Just emit events in your controllers — no manual start/commit needed.

3

Emit events in your application code

def create_order(conn, params) do
  case Orders.insert(params) do
    {:ok, order} ->
      Emissions.emit(:order_created, order, %{source: "api"})
      json(conn, %{id: order.id})

    {:error, changeset} ->
      conn
      |> put_status(:unprocessable_entity)
      |> json(%{errors: format_errors(changeset)})
  end
end
4

Route events to topics (optional)

defmodule MyApp.Topics do
  use Emissions.TopicRouter,
    mappings: %{
      orders: [:order_created, :order_updated, :order_shipped],
      inventory: [:item_received, :item_adjusted, :item_moved],
      users: [:user_created, :user_updated]
    }
end

MyApp.Topics.topic_for_event(:order_created)
#=> :orders

Architecture

A GenStage pipeline with fan-out to multiple adapters

Your Application
emit() / commit()
Buffer
GenServer, per-request
Pipeline Producer
GenStage + BroadcastDispatcher
Kafka Adapter
Phoenix Channels
Handler Dispatcher

Events flow through a per-request buffer, get committed to a GenStage producer, and fan out to all registered adapter consumers. Each consumer manages its own backpressure independently. The Handler Dispatcher is a special adapter that routes events to your internal handler modules, executing each in a supervised task.

Buffer Lifecycle

Transactional event buffering with clean semantics

start() Create a per-request buffer
emit() Append events to the buffer
commit() Send to pipeline & close buffer
flush() Send to pipeline & keep open
terminate() Discard events & clean up

Guides

Dive deeper with our comprehensive guides