Phoenix Channels
Emissions ships an optional Phoenix Channels adapter that broadcasts events over WebSockets. When enabled, events committed through the pipeline are pushed to connected clients in real time.
Installation
Add :phoenix to your dependencies if not already present:
{:phoenix, "~> 1.7"}
The Phoenix adapter modules are compiled only when Phoenix is available. If your project does not include :phoenix, the modules simply won't exist.
Quick Start
Three steps to get events flowing to WebSocket clients:
1. Add the adapter to your config
config :emissions,
adapters: [
{Emissions.Phoenix.PubSubAdapter, pubsub: MyApp.PubSub}
]
2. Create a channel
defmodule MyAppWeb.EmissionsChannel do
use Emissions.Phoenix.Channel
end
3. Mount the channel in your socket
defmodule MyAppWeb.UserSocket do
use Phoenix.Socket
channel "emissions:*", MyAppWeb.EmissionsChannel
@impl true
def connect(_params, socket, _connect_info), do: {:ok, socket}
@impl true
def id(_socket), do: nil
end
That's it. Any events committed through the Emissions pipeline will be pushed to connected WebSocket clients.
How It Works
The PubSubAdapter receives event batches from the GenStage pipeline, serializes each event, and broadcasts it to Phoenix.PubSub topics. The channel subscribes to PubSub on join and pushes incoming events to connected clients.
Topic Hierarchy
Each event is broadcast to multiple PubSub topics, allowing clients to subscribe at different granularity levels:
| Level | Topic Example | Receives |
|---|---|---|
| Catch-all | "emissions" |
Every event |
| Topic group | "emissions:orders" |
Events mapped to the orders topic (requires TopicRouter) |
| Event name | "emissions:order_created" |
Only order_created events |
A client joining "emissions:orders" receives all order-related events. A client joining "emissions:order_created" receives only that specific event type. A client joining "emissions" receives everything.
TopicRouter Integration
When you configure a TopicRouter, events are also broadcast to their mapped topic group:
defmodule MyApp.Topics do
use Emissions.TopicRouter,
mappings: %{
orders: [:order_created, :order_updated, :order_shipped],
users: [:user_created, :user_updated]
}
end
config :emissions,
adapters: [
{Emissions.Phoenix.PubSubAdapter,
pubsub: MyApp.PubSub,
topic_router: MyApp.Topics}
]
An :order_created event would then be broadcast to "emissions:orders", "emissions:order_created", and "emissions".
Without a TopicRouter, events are only broadcast to the event-name and catch-all topics.
Custom Serialization
The default serializer converts events into maps with name, payload, metadata, and timestamp keys. The PubSubAdapter accepts any module implementing Emissions.Serializer (or the narrower Emissions.Phoenix.Serializer which constrains the return type to map()).
For non-Phoenix serialization (binary formats, custom encodings), see the Serialization guide.
Implement a custom serializer for a different wire format:
defmodule MyApp.EmissionsSerializer do
@behaviour Emissions.Phoenix.Serializer
@impl true
def serialize(event) do
%{
type: event.name,
data: event.payload,
occurred_at: event.timestamp
}
end
end
config :emissions,
adapters: [
{Emissions.Phoenix.PubSubAdapter,
pubsub: MyApp.PubSub,
serializer: MyApp.EmissionsSerializer}
]
Authorization
Override join/3 in your channel to add authorization:
defmodule MyAppWeb.EmissionsChannel do
use Emissions.Phoenix.Channel
def join(topic, params, socket) do
if authorized?(socket, topic) do
super(topic, params, socket)
else
{:error, %{reason: "unauthorized"}}
end
end
defp authorized?(socket, _topic) do
socket.assigns[:user_id] != nil
end
end
The generated join/3 is defoverridable, so super/3 delegates to the default implementation which subscribes to the PubSub topic.
Adapter Options
| Option | Default | Description |
|---|---|---|
:pubsub |
(required) | The Phoenix.PubSub server name |
:topic_prefix |
"emissions" |
Prefix for all topic strings |
:topic_router |
nil |
Module using Emissions.TopicRouter |
:serializer |
DefaultSerializer |
Module implementing Emissions.Serializer (or Emissions.Phoenix.Serializer) |
JavaScript Client
Connect using the phoenix npm package:
import { Socket } from "phoenix";
const socket = new Socket("/socket", { params: { token: userToken } });
socket.connect();
// Subscribe to all order events
const ordersChannel = socket.channel("emissions:orders", {});
ordersChannel.join();
ordersChannel.on("event", (payload) => {
console.log("Order event:", payload.name, payload.payload);
});
// Or subscribe to a specific event type
const createdChannel = socket.channel("emissions:order_created", {});
createdChannel.join();
createdChannel.on("event", (payload) => {
console.log("New order:", payload.payload);
});
Testing
Test your channel using Phoenix.ChannelTest:
defmodule MyAppWeb.EmissionsChannelTest do
use ExUnit.Case
import Phoenix.ChannelTest
@endpoint MyAppWeb.Endpoint
test "receives events pushed to the channel" do
{:ok, _, _socket} =
socket(MyAppWeb.UserSocket, nil, %{})
|> subscribe_and_join(MyAppWeb.EmissionsChannel, "emissions:orders")
Phoenix.PubSub.broadcast(
MyApp.PubSub,
"emissions:orders",
{:emissions_event, %{name: :order_created, payload: %{id: 1}}}
)
assert_push "event", %{name: :order_created}
end
end