Skip to content
/ mx Public

OTP Message Broker (pub/sub, workers queue, priority/deferring delivery)


Notifications You must be signed in to change notification settings


Folders and files

Last commit message
Last commit date

Latest commit


Repository files navigation

OTP Message Broker


Universal OTP message broker features:

  • create channels (pub/sub)
  • pools (workers queue)
  • mixing it (pool of channels, channel of pools... etc.)
  • send messages with specify priority of delivering messages (range: 1..10)
  • pool has 3 balance methods: rr(round robin), hash (by erlang:phash(Message, lenth(Pool))), random
  • defer message delivering in case of
    • exceed the queue limit (10000) and receiver has the 'true' in 'defer' option.
    • client has 'offline' state and the 'defer' option is set to 'true'
  • 'async' Client option allows you control the delivery process. Default value of this option is 'true'.


For dev purposes simple run:

git clone && cd mx && make compile && echo "MX is installed in $(pwd)"

For production use:

% rebar.config -- add mx in deps section
{deps, [
       {mx, {git, "git://", {branch, "master"}}}
%% and add mx to the relx section
{relx, [{release, {your_rel, "0.0.1"},

You can specifying a queue limits in sys.config:

    {mx, [
       {queue_length_limit, 100000},
       {queue_low_threshold, 0.6},   % 60%
       {queue_high_threshold, 0.8}   % 80%


make run

Distributed mode

Run the first node:

make demo_run node_name='[email protected]'

(mxnode01@> application:start(mx).

and the second one:

make demo_run node_name='[email protected]'

(mxnode02@> application:load(mx).
%% Set via environment value of **'master'** to run it in slave mode.
(mxnode02@> application:set_env(mx, master, '[email protected]').
(mxnode02@> application:start(mx).

Call mx:nodes() to get the list of mx cluster nodes.

(mxnode01@> mx:nodes().
['[email protected]','[email protected]']

Mnesia custom directory (optional)

Create dir, for example, /usr/local/var/lib/mx/mnesia/ with correct (Read/Write) permissions.

Set option mnesia_base_dir with this directory in sys.config:

    {mx, [
       {mnesia_base_dir,      "/usr/local/var/lib/mx/mnesia/"}

Or set the value of configuration parameter mnesia_base_dir for mx:

make demo_run node_name='[email protected]'

(mxnode01@> application:load(mx).
(mxnode01@> application:set_env(mx, mnesia_base_dir, "/usr/local/var/lib/mx/mnesia/").
(mxnode01@> application:start(mx).

So, mnesia data will be located:

/usr/local/var/lib/mx/mnesia/[email protected]  %% node name


% Client has higest priority by default (priority = 1)
{clientkey, Client1Key} = mx:register(client, "Client1"),
{clientkey, Client2Key} = mx:register(client, "Client2", [{priority, 8}]),
{clientkey, Client3Key} = mx:register(client, "Client3", [{async, false}, {defer, true}]),
{clientkey, Client4Key} = mx:register(client, "Client4"),

% register channel with default priority (5)
{channelkey, Channel1Key} = mx:register(channel, "Channel1", Client4Key),
ok = mx:subscribe(Client1Key, Channel1Key),
% just for example try to subscribe one more time
{already_subscribed, Client1Key} = mx:subscribe(Client1Key, Channel1Key),

ok = mx:subscribe(Client2Key, Channel1Key),
ok = mx:subscribe(Client3Key, Channel1Key),

mx:send(Channel1Key, "Hello, Channel1!").

% register pool with default balance method is 'rr' - round robin
%             default priority (5)
{poolkey, Pool1Key} = mx:register(pool, "Pool1", Client4Key),
mx:join(Client1Key, Pool1Key),
mx:join(Client2Key, Pool1Key),
mx:join(Client3Key, Pool1Key),

% create lowest priority channel and pool by Client2
{channelkey, LPCh1} = mx:register(channel, "LP Channel1", Client2Key, [{priority, 10}]),
{poolkey, LPPl1}    = mx:register(pool, "LP Pool1", Client2Key, [{priority, 10}]),

% create highest priority channel and pool by Client3
{channelkey, HPCh1} = mx:register(channel, "HP Channel1", Client2Key, [{priority, 1}]),
{poolkey, HPPl1}    = mx:register(pool, "HP Pool1", Client2Key, [{priority, 1}]),

% high priority pool with 'hash' balance
{poolkey, HP_Hash_Pl}    = mx:register(pool, "HP Pool (hash)", Client2Key, [{priority, 1}, {balance, hash}]),

% pool with random balance
{poolkey, Rand_Pl}    = mx:register(pool, "Pool (random)", Client2Key, [{balance, random}]),


local usage

  • Create client/channel/pool

    mx:register(client, Name)
    mx:register(client, Name, Opts)
     Name - list or binary
     Opts - proplists

    returns: {clientkey, Key} | {duplicate, Key}

     Key - binary
    mx:register(channel, Name, ClientKey)
    mx:register(channel, Name, ClientKey, Opts)
     Name - list or binary
     Opts - proplists
     ClientKey - binary

    returns: {channelkey, Key} | {duplicate, Key}

     Key - binary
    mx:register(pool, Name, ClientKey)**
    mx:register(pool, Name, ClientKey, Opts)**
     Name - list or binary
     Opts - proplists
     ClientKey - binary

    returns: {poolkey, Key} | {duplicate, Key}

     Key - binary
  • Delete client/channel/pool

  • Set online/offline state

    mx:online(ClientKey, Pid)
  • Work with channel/pool

    mx:subscribe(Key, Channel)
    mx:unsubscribe(Key, Channel)
     Key - binary (ClientKey, ChannelKey, PoolKey)
     Channel - channel name or channel key
    mx:join(Key, Pool)
    mx:leave(Key, Pool)
     Key - binary (ClientKey, ChannelKey, PoolKey)
     Pool - pool name or pool key
  • Set options for client/channel/pool

    mx:set(Key, Opts)
     Key - binary (ClientKey, ChannelKey, PoolKey)
     Opts - proplists
  • Sending message

    mx:send(ClientKey, Message)
    mx:send(ChannelKey, Message)
    mx:send(PoolKey, Message)
  • Owning Pool/Channel

    mx:own(Key, ClientKey)
     Key - binary (ChannelKey, PoolKey)

    orphan Pool/Channel will unregister automaticaly

    mx:abandon(Key, ClientKey)
     Key - binary (ChannelKey, PoolKey)
  • Clear deferred messages

     Key - binary (ClientKey, ChannelKey, PoolKey)
     Key = all - truncate the 'deferred' table
  • Clear all MX tables

  • Info

    show MX cluster nodes


    show full information about the client

     Key - binary (ClientKey, ChannelKey, PoolKey)

    show the only Name property from the information list about the client

    mx:info(Key, Name)
     Key - binary (ClientKey, ChannelKey, PoolKey)
     Name - field name

    show the list of Clients are subscribed/joined to.

     Key - binary (ChannelKey, PoolKey)

remote usage

gen_server:call(MX, Message)

where the Message is one of the listed values below:

  • {register_client, Client}
  • {register_client, Client, Opts}
  • {register_channel, ChannelName, ClientKey}
  • {register_channel, ChannelName, ClientKey, Opts}
  • {register_pool, PoolName, ClientKey}
  • {register_pool, PoolName, ClientKey, Opts}
  • {unregister, Key}
  • {online, ClientKey, Pid}
  • {offline, ClientKey}
  • {subscribe, Client, To}
  • {unsubscribe, Client, From}
  • {join, Client, To}
  • {leave, Client, From}
  • {send, To, Message}
  • {own, Key, ClientKey}
  • {abandon, Key, ClientKey}
  • {info, Key}
  • {info, {Key, Name}}
  • {relation, Key}
  • {set, Key, Opts}
  • nodes
  • clear_all_tables
> (mxnode02@> gen_server:call({mx, '[email protected]'}, nodes).
['[email protected]','[email protected]']


There are only common tests (CT) are implemented with some limited set of cases

Direct sending tests.

Sequentualy running:

  • 1 reciever - 1 message
  • 1 reciever - 1000 messages
  • 1000 recievers - 1 messages
  • 1000 recievers - 100 messages


  • 1 reciever - 1000 messages (3 processes)
  • 1000 recievers - 1 messages (3 processes)

Channel tests (pub/sub).

Sequentualy running:

  • 1 subscriber (subscriber) recieves 1 messages
  • 1 subscriber - 1000 messages
  • 1000 subscribers - 1 messages
  • 1000 subscribers - 10 messages


  • 1000 subscribers - 10 messages (10 processes)

Priority delivering:

  • 1 subscriber recieves 1000 messages with 10 different priorities:
    • 100 messages with priority 1
    • 100 messages with priority 2
    • ...
    • 100 messages with priority 10

Pool tests (worker queue)

Sequentualy running:

  • 1 client sends 1 messages to 1 worker
  • 1 client - 1000 messages - 2 workers (round robin)
  • 1 client - 1000 messages - 2 workers (hash)
  • 1 client - 1000 messages - 2 workers (random)
  • 1000 clients - 1 messages - 4 workers
  • 1000 clients - 1000 messages - 4 workers

Parallel: (not implemented)

  • 1000 clients - 10 messages (10 processes)

Run the testing

  1. Run MX application as standalone application
$ make run
  1. Run "Common Tests"
$ make ct


OTP Message Broker (pub/sub, workers queue, priority/deferring delivery)








No releases published


No packages published