GenStage Pipeline
Emissions uses a GenStage pipeline to fan out committed events to all configured adapters with independent backpressure per consumer.
Architecture
Producer
Emissions.Pipeline.Producer is a named GenStage producer that uses GenStage.BroadcastDispatcher. This dispatcher sends every event to every subscribed consumer, enabling true fan-out.
When a buffer commits, it calls Producer.notify/1 which pushes events into the producer via GenStage.cast. The producer drains its internal queue in response to demand from consumers.
Adapter Consumers
Each configured adapter is wrapped in an Emissions.Pipeline.AdapterConsumer — a GenStage consumer that subscribes to the producer. Each consumer:
- Calls the adapter's
init/1on startup - Subscribes to the producer with configurable
max_demand - Optionally filters events via
interested?/2 - Delegates to
handle_events/2for delivery - Reports success/failure via telemetry
Backpressure
GenStage's demand-driven flow means each consumer independently controls how fast it receives events:
- A fast WebSocket adapter can process events immediately
- A slow Kafka adapter under load naturally applies backpressure
- The slow adapter does not block the fast one — each manages its own demand
- If all consumers are slow, the producer buffers events internally
Supervision
rest_for_one"] S --> BP["Emissions.BufferPool
DynamicSupervisor"] S --> PR["Pipeline.Producer
GenStage"] S --> AS["Pipeline.AdapterSupervisor
one_for_one"] S --> TS["Emissions.TaskSupervisor
Task.Supervisor"] BP -.->|dynamic| B1["Buffer 1"] BP -.->|dynamic| B2["Buffer 2"] BP -.->|dynamic| BN["Buffer N"] AS --> AC1["AdapterConsumer
Adapter 1"] AS --> AC2["AdapterConsumer
Adapter 2"] AS --> AH["AdapterConsumer
HandlerDispatcher"] style S fill:#6366f1,color:#fff style BP fill:#4a9eff,color:#fff style PR fill:#4a9eff,color:#fff style AS fill:#4a9eff,color:#fff style TS fill:#4a9eff,color:#fff
The top-level supervisor uses rest_for_one strategy. If the producer crashes, all adapter consumers are restarted (since they subscribe to the producer). The AdapterSupervisor uses one_for_one so a single adapter crash doesn't affect others.
Configuration
Tune the pipeline via the :pipeline config key:
config :emissions,
pipeline: [
max_demand: 50 # events per consumer per batch (default: 50)
]
Higher max_demand means larger batches to adapters (better throughput, higher latency). Lower values mean smaller batches (lower latency, more overhead).
Handler Dispatcher
The Emissions.HandlerDispatcher is a special adapter that dispatches events to internal handlers. It's automatically added to the pipeline when handlers are configured. It implements the same Emissions.Adapter behaviour, so it participates in the GenStage pipeline like any other adapter.