Publish-Subscribe in Elixir

Publish-Subscribe is a messaging pattern that works as follows: a group of consumers subscribe to events of a given topic and are notified whenever an event of that topic arrives.

When an event is published into a topic channel, the channel delivers a copy of the message to each of the output channels. The advantage of this is we can decouple the consumers from the producers. Neither party need knowledge of each other to communicate.

In other words, pub-sub is a pattern used to communicate messages between different system components without the components knowing anything about each other’s identity.

Let’s look at an implementation of Publish-Subscribe in Elixir. The full source code is provided below.

📬 Get updates straight to your inbox.

Subscribe to my newsletter so you don't miss new content.

Example

How would our working pub-sub system look like?

iex> Pubsub.subscribe(:test)
{:ok, #PID<0.141.0>}
iex> Pubsub.subscribe(:test)
{:ok, #PID<0.143.0>}
iex> Pubsub.broadcast(:test, "hey")
"hey"
"Got \"hey\" in process #PID<0.141.0>"
"Got \"hey\" in process #PID<0.143.0>"
iex> Pubsub.subscribers(:test)
[#PID<0.141.0>, #PID<0.143.0>]

Pubsub.subscribe spawns a new process that listens to a given topic.

Pubsub.broadcast sends a message to all subscribes to that topic.

Finally, Pubsub.subscribers that lists out all the processes that are subscribing to a given topic.

Note that you are not limited to string messages. message can be any term, which includes tuples and maps.

Implementation

gproc is a process registry library written in Erlang, but it’s really a shared dictionary at its core. We will use the following gproc methods to build our pub-sub system:

  • :gproc.reg - Registers a Key for the current process.
  • :gproc.send - Sends a message to the process, or processes, corresponding to Key.
  • :gproc.lookup_pids - Returns a list of pids with the published key Key.

As you can see, your pub-sub system matches very well to the gproc API.

Note that gproc accepts Key for a lot of its functions. Below briefly explains what a Key consists of:

# names | props | counters | aggr_counters | resources | resource_counters
@type type :: :n | :p | :c | :a | :r | :rc

# local | global
@type scope :: :l | :g

@type name :: term

@type key :: {type, scope, name}

An example Key is {:p, :l, :mytopic}, which is a local property with the name :mytopic. Note that name can be any term including tuples and maps.

Consult the gproc documentation for what each type and scope means.

A complete gproc API reference can be found here.

Code

The full source code is available on Github.

defmodule Pubsub do
    use GenServer

    # https://github.com/uwiger/gproc/wiki/The-gproc-api
    @type type :: :n | :p | :c | :a | :r | :rc
    @type scope :: :l | :g
    @type name :: term
    @type key :: {type, scope, name}

    # Client API
    def subscribe(topic) do
        :gproc.reg({:p,:l,topic})
    end

    def subscribers(topic) do
        :gproc.lookup_pids({:p, :l, topic})
    end

    def broadcast(topic, message) do
        :gproc.send({:p, :l, topic}, message)
    end

    # Server API
    def init(args) do
      :gproc.reg({:p, :l, args[:topic]})
      {:ok, []}
    end

    def handle_info(message, state) do
      IO.inspect "Got #{inspect message} in process #{inspect self()}"
      {:noreply, state}
    end
end

In Closing

So how can an application only send messages to the applications that are interested in receiving the messages without knowing the identities of the receivers?

We can use a publish-subscribe system!

We’ve also seen how we can write a simple publish-subscribe system in Elixir, with gproc doing most of the heavy lifting.

Appendix

Phoenix Channels uses Phoenix.PubSub for pubsub broadcast, but Phoenix.PubSub also provides an API for direct usage. Definitely keep an eye out for an official release.

ChrisMcCord mentions Phoenix.PubSub in his ElixirConfEU talk.