Skip to content

Pub-sub and work queue server. Wildcards, streams, back-pressure, multi-transport. Just Node and a filesystem required.

License

Notifications You must be signed in to change notification settings

davedoesdev/centro

Repository files navigation

centro   Build Status Build Status Coverage Status NPM version

Centro is a Node.js module for publishing and subscribing to messages over a network. It includes code for running a server and clients which connect to a server.

Messages are published to topics which support wildcard matching (single- and multi-level).

Each message has its own content stream, with full back-pressure support, and can be delivered to multiple interested clients or to exactly one.

All messages are stored in a directory on the server machine’s filesystem so no external services are required. You can run multiple server instances on the same directory, for example one per CPU core.

Extra options (via qlobber-fsq) are available to use shared memory on a single server machine or a distributed file system for multiple server machines.

Alternatively, you can use qlobber-pg to store messages in PostgreSQL.

Clients can connect to the server using Primus (Websockets), TCP, HTTP, HTTP/2 or in-memory streams.

API documentation is available here.

Example

An example of running a server and clients using different transports is described below. All the example files are available in the test/example directory.

Server

Here’s a server which listens on all transports.

server.js
const centro = require('centro-js');
const fs = require('fs');
const path = require('path');

const config = {
    allowed_algs: ['EdDSA'], // (1)
    auth_method: 'Basic', // (2)
    transports: [{
        server: 'tcp',
        config: { port: 8800 }
    }, {
        server: 'primus',
        config: { port: 8801 }
    }, {
        server: 'http',
        config: { port: 8802 }
    }, {
        server: 'http2',
        config: { port: 8803 }
    }, {
        server: 'http2-duplex',
        config: {
            port: 8804,
            key: fs.readFileSync(path.join(__dirname, '..', '..', 'server.key')), // (3)
            cert: fs.readFileSync(path.join(__dirname, '..', '..', 'server.pem'))
        }
    }, {
        server: 'in-mem',
        config: {
            allowed_algs: ['HS256'], // (4)
            privileged: true // (5)
        }
    }]
};

const server = new centro.CentroServer(config); // (6)

server.on('warning', err => console.error(err.message));
server.on('ready', () => console.log('READY.'));
  1. Authorization tokens signed with these algorithms are accepted.

  2. By default, if a client doesn’t present a token, Centro asks it for Bearer authentication. Browsers don’t respond to that, so ask for Basic authentication.

  3. Browsers only support HTTP/2 over TLS.

  4. In-memory transport can use a shared secret to sign authorization tokens. The secret will be stored only in memory by default.

  5. Allow clients using this in-memory transport to see messages sent by any client. Useful for monitoring but use the privileged option with care!

  6. Connections may be refused until ready is emitted.

This is just an example — you only need to list transports which your applications will use to connect. Before you run node server.js, make sure you run grunt keys first.

Authorization tokens

Transports expect clients to present an authorization token when they connect.

Centro uses authorize-jwt to verify tokens. Your tokens must be JSON Web Tokens. The token format that Centro expects is described here.

You should generate a keypair, add the public key to authorize-jwt's keystore and sign your tokens with the private key.

Here’s a program which generates a keypair using jose, adds the public key to Centro’s keystore with the identifier http://davedoesdev.com, and writes the private key to priv_key.pem:

add_key.js
const uri = 'http://davedoesdev.com'; // (1)
const authorize_jwt = require('authorize-jwt');
const assert = require('assert');
const path = require('path');
const fs = require('fs');
const { JWK } = require('jose');
const priv_key = JWK.generateSync('OKP'); // (2)
const pub_key = priv_key.toPEM(); // (3)

authorize_jwt({
    db_type: 'pouchdb', // (4)
    db_for_update: true, // (5)
    no_changes: true // (6)
}, function (err, authz) {
    assert.ifError(err);
    authz.keystore.add_pub_key(uri, pub_key, function (err) { // (7)
        assert.ifError(err);
        authz.keystore.deploy(); // (8)
        fs.writeFile(path.join(__dirname, 'priv_key.pem'), // (9)
                     priv_key.toPEM(true),
                     assert.ifError);
    });
});
  1. Unique identifier for the keypair.

  2. Generate the keypair. You could also use crypto.generateKeyPairSync('ed25519') or openssl genpkey -algorithm Ed25519 on the command line, for example.

  3. Get the public key in PEM form.

  4. Alternatively, you can use couchdb (you’ll have to set up your own CouchDB server), sqlite or pg (you’ll have to set up your own PostgreSQL server).

  5. We’re going to update the keystore.

  6. We’re not interested in changes to the keystore — we’re just going to update the public key and exit.

  7. Associate the public key with http://davedoesdev.com.

  8. PouchDB-based keystores update a master database and then replicate changes to reader databases. Here we deploy() the master database to let any active reader databases know we’re done updating.

  9. The private key is not stored in the keystore but needs to be available when you want to sign authorization tokens. Here we write it to disk but this is just an example — you probably want a more secure way of storing it.

After you’ve run node add_key.js, you need to make a JWT, using the private key to sign it.

The iss claim in the token should be the unique issuer ID associated with http://davedoesdev.com in Centro’s keystore. You can use the get_pub_key_by_uri method to retrieve the issuer ID. Clients which use tokens with different issuer IDs can’t send messages to each other.

The access_control claim in the token should specify to which topics clients that present this token can publish and subscribe. Topics should be in AMQP format: . delimits words, * matches exactly one word and # matches zero or more words. See mqlobber-access-control for more details.

You can use any JWT module to generate your tokens. Here’s an example using jose:

make_token.js
const uri = 'http://davedoesdev.com';
const authorize_jwt = require('authorize-jwt');
const { JWK, JWT } = require('jose');
const assert = require('assert');
const path = require('path');
const fs = require('fs');

fs.readFile(path.join(__dirname, 'priv_key.pem'), function (err, priv_key) { // (1)
    assert.ifError(err);

    authorize_jwt( // (2)
    {
        db_type: 'pouchdb',
        deploy_name: 'token',
        no_changes: true
    }, function (err, authz)
    {
        assert.ifError(err);
        authz.keystore.get_pub_key_by_uri(uri, function (err, pub_key, issuer_id) // (3)
        {
            assert.ifError(err);
            assert(pub_key);
            assert(issuer_id);
            console.log(JWT.sign({
                 access_control: { // (4)
                    subscribe: { allow: ['#'], disallow: [] },
                    publish: { allow: ['#'], disallow: [] }
                 }
            }, JWK.asKey(priv_key), { // (5)
                algorithm: 'EdDSA',
                issuer: issuer_id, // (6)
                expiresIn: '1d' // (7)
            }));
        });
    });
});
  1. Read the private key. This is just an example — you should have a more secure way of storing private keys.

  2. Open the keystore for reading.

  3. Retrieve the issuer ID for http://davedoesdev.com.

  4. Allow clients using this token to subscribe and publish to any topic.

  5. Supply the private key for signing.

  6. Use the issuer ID in the token.

  7. Set token expiry to 1 day.

The token is valid for 1 day, allows clients which use it to publish and subscribe to any topic and is written to standard output. The client examples below expect it in an environment variable called CENTRO_TOKEN so you might do something like this to set it:

export CENTRO_TOKEN=$(node make_token.js)

Node clients

TCP

Subscribe to topics given on the command line and display the topic and content of each message received:

subscribe_tcp.js
const centro = require('centro-js');
const net = require('net');
const assert = require('assert');

function display_message(s, info) {
    console.log('topic:', info.topic); // (1)
    s.pipe(process.stdout); // (2)
}

net.createConnection(8800, function () { // (3)
    centro.stream_auth(this, { // (4)
        token: process.env.CENTRO_TOKEN // (5)
    }).on('ready', function () {
        for (const topic of process.argv.slice(2)) {
            this.subscribe(topic, display_message, assert.ifError); // (6)
        }
    });
});
  1. Display the message’s topic

  2. Pipe the message’s content stream to standard output.

  3. Open a TCP connection to the server on port 8800.

  4. The TCP transport expects the token on the connection stream.

  5. Read the token from the environment.

  6. Subscribe to the topics given on the command line.

Publish a message, topic given on the command line and content read from standard input:

publish_tcp.js
const centro = require('centro-js');
const net = require('net');
const assert = require('assert');

net.createConnection(8800, function () {
    var conn = this;

    centro.stream_auth(conn, {
        token: process.env.CENTRO_TOKEN
    }).on('ready', function () {
        process.stdin.pipe(this.publish(process.argv[2], function (err) { // 1 2
            assert.ifError(err);
            conn.end(); // 3
        }));
    });
});
  1. Publish the message to the topic given on the command line.

  2. Pipe standard input to the message’s content stream.

  3. Close the TCP connection, which will also cause the process to exit.

Here’s a sample run:

$ node subscribe_tcp.js 'foo.*'
topic: foo
hello
$ echo hello | node publish_tcp.js foo.bar

Primus

Here are similar clients which use the Primus transport.

subscribe_primus.js
const centro = require('centro-js');
const assert = require('assert');
const Primus = require('primus');
const Socket = Primus.createSocket({
    pathname: '/centro/v' + centro.version + '/primus' // (1)
});
const PrimusDuplex = require('primus-backpressure').PrimusDuplex; // (2)

function display_message(s, info) {
    console.log('topic:', info.topic);
    s.pipe(process.stdout);
}

centro.separate_auth( { // (3)
    token: process.env.CENTRO_TOKEN
}, function (err, userpass, make_client) {
    assert.ifError(err);

    const socket = new Socket('http://' + userpass + '@localhost:8801', // (4)
                              { strategy: false }); // (5)
    const duplex = new PrimusDuplex(socket);

    make_client(duplex).on('ready', function () { // (6)
        for (const topic of process.argv.slice(2)) {
            this.subscribe(topic, display_message, assert.ifError);
        }
    });
});
  1. The Primus transport uses a versioned path.

  2. The Primus transport uses primus-backpressure.

  3. The Primus transport expects the token to be supplied in the HTTP request authorization, before the connection stream is established.

  4. Open a connection to the server.

  5. You should disable Primus’s auto-reconnect feature because it doesn’t work with Centro. Centro’s connections are stateful (they have shared state between the client and server). The server deletes its state immediately upon disconnect. If you need auto-reconnect you should implement it in your application.

  6. Establish a connection stream to the server.

publish_primus.js
const centro = require('centro-js');
const assert = require('assert');
const Primus = require('primus');
const Socket = Primus.createSocket({
    pathname: '/centro/v' + centro.version + '/primus'
});
const PrimusDuplex = require('primus-backpressure').PrimusDuplex;

centro.separate_auth({
    token: process.env.CENTRO_TOKEN
}, function (err, userpass, make_client) {
    assert.ifError(err);

    const socket = new Socket('http://' + userpass + '@localhost:8801',
                              { strategy: false });
    const duplex = new PrimusDuplex(socket);

    make_client(duplex).on('ready', function () {
        process.stdin.pipe(this.publish(process.argv[2], function (err) {
            assert.ifError(err);
            duplex.end();
        }));
    });
});

HTTP/2

The HTTP/2 transport sends messages over a HTTP/2 stream. Note that browsers can’t use the HTTP/2 transport directly because they don’t fully support streaming uploads. They should use the HTTP/2 Duplex transport.

Here are publish and subscribe examples for Node using the http2 module.

subscribe_http2.js
const centro = require('centro-js');
const assert = require('assert');
const http2 = require('http2');

function display_message(s, info) {
    console.log('topic:', info.topic);
    s.pipe(process.stdout);
}

centro.separate_auth({
    token: process.env.CENTRO_TOKEN
}, function (err, userpass, make_client) {
    assert.ifError(err);
    http2.connect('http://localhost:8803', function () {
        this.request({
            ':method': 'POST', // (1)
            ':path': `/centro/v${centro.version}/http2`,
            Authorization: `Bearer ${userpass.split(':')[1]}` // (2)
        }).on('response', function (headers) {
            assert.equal(headers[':status'], 200);
            make_client(this).on('ready', function () {
                for (var topic of process.argv.slice(2)) {
                    this.subscribe(topic, display_message, assert.ifError);
                }
            });
        });
    });
});
  1. You just need a single POST stream for all messages. Centro multiplexes messages using bpmux.

  2. This example uses bearer authentication, which just needs the token, not the centro: prefix.

publish_http2.js
const centro = require('centro-js');
const assert = require('assert');
const http2 = require('http2');

centro.separate_auth({
    token: process.env.CENTRO_TOKEN
}, function (err, userpass, make_client) {
    assert.ifError(err);
    http2.connect('http://localhost:8803', function () {
        const session = this;
        this.request({
            ':method': 'POST',
            ':path': `/centro/v${centro.version}/http2`,
            Authorization: `Bearer ${userpass.split(':')[1]}`
        }).on('response', function (headers) {
            assert.equal(headers[':status'], 200);
            const stream = this;
            make_client(this).on('ready', function () {
                process.stdin.pipe(this.publish(process.argv[2], function (err) {
                    assert.ifError(err);
                    stream.end();
                    session.close();
                }));
            });
        });
    });
});

Browser clients

Primus

When you run a Centro server with a Primus transport, Primus itself is made available over HTTP at the following path:

/centro/v2/primus/primus.js

So on the example server, it’s available at the following URL:

Of course, the version number may change and the machine may be reachable via a different hostname depending on your DNS configuration.

A webpacked copy of the Centro client code is available in dist/centro.js.

First we define our user interface in HTML. We’ll have a section where you can publish messages and a section where you can see messages which have been published:

example_primus.html
<html>
  <head>
    <title>Centro Example</title>
    <link href="example.css" rel="stylesheet" type="text/css"> (1)
    <script src="http://localhost:8801/centro/v2/primus/primus.js"></script> (2)
    <script src="dist/centro.js"></script> (3)
    <script src="example_primus.js"></script> (4)
  </head>
  <body onload="connect()"> (5)
    <form onsubmit="publish(event)"> (6)
      <div id="input">
        <div>
          <label>topic: <input type="text" id="topic" autofocus></label> (7)
          <label>message:  <input type="text" id="message"></label> (8)
        </div>
        <input type="submit" value="publish"> (9)
      </div>
      <div id="messages"> (10)
      </div>
    </form>
  </body>
</html>
  1. Some CSS is required to lay this out nicely. It’s available in test/example/browser/example.css.

  2. Load Primus.

  3. Load the Centro client code.

  4. Load script to make the example work (see below).

  5. When the page loads, initialize the script by calling connect().

  6. When the user clicks on the publish button, call publish().

  7. Input field for message topic.

  8. Input field for message content.

  9. Publish button.

  10. Displays messages received.

Next we need to write the script which connects to the Centro server and subscribes to and publishes messages:

example_primus.js
let publish = function (event) { // (1)
    event.preventDefault();
};

function connect() {
    const topic = document.getElementById('topic');
    const message = document.getElementById('message');
    const messages = document.getElementById('messages');
    const params = new URLSearchParams(window.location.search);

    function tag_text(cls, text) {
        const div = document.createElement('div');
        div.className = cls;
        div.appendChild(document.createTextNode(text));
        return div;
    }

    function add_message(div) { // (2)
        messages.appendChild(div);
        messages.scrollTop = messages.scrollHeight;
    }

    centro.separate_auth({
        token: params.get('token') // (3)
    }, function (err, userpass, make_client) {
        if (err) { throw(err); }

        const primus = new Primus('http://' + userpass + '@localhost:8801',
                                  { strategy: false });
        const duplex = new centro.PrimusDuplex(primus);
        const client = make_client(duplex);

        client.on('ready', function () {
            add_message(tag_text('status', 'open')); // (4)
            this.subscribe(params.get('subscribe'), function (s, info) {
                centro.read_all(s, function (v) {
                    const msg = document.createElement('div');
                    msg.className = 'message';
                    msg.appendChild(tag_text('topic', info.topic));
                    msg.appendChild(tag_text('data', v.toString()));
                    add_message(msg); // (5)
                });
            });

            publish = function (event) {
                event.preventDefault();
                client.publish(topic.value).end(message.value); // (6)
            };
        });

        primus.on('close', function () {
            add_message(tag_text('status', 'closed')); // (7)
        });
    });
}
  1. While the page loads, clicking the publish button does nothing.

  2. Function to display a message.

  3. Read the authorization token from the URL query parameters.

  4. Display a message to say the connection stream to the server is open.

  5. When we receive a message, display its topic and content.

  6. When the user clicks the publish button, publish a message.

  7. Display a message to say the connection stream to the server is closed.

Load the example page using a URL like file:///path/to/example_primus.html?subscribe=foo.*&token=XXX where XXX is the output from node make_token.js.

HTTP

The Centro HTTP transport supports access using HTTP requests, without using the Centro client:

/centro/v2/publish?authz_token=XXX&topic=YYY

Publish a message (POST request, message content in request body)

/centro/v2/subscribe?authz_token=XXX&topic=YYY

Subscribe to messages (messages delivered using server-sent events)

The HTML for this example is the same as Primus HTML except that we don’t need the Primus client or the Centro client:

example_sse.html
<html>
  <head>
    <title>Centro Example</title>
    <link href="example.css" rel="stylesheet" type="text/css">
    <script src="example_sse.js"></script>
  </head>
  <body onload="connect()">
    <form onsubmit="publish(event)">
      <div id="input">
        <div>
          <label>topic: <input type="text" id="topic" autofocus></label>
          <label>message:  <input type="text" id="message"></label>
        </div>
        <input type="submit" value="publish">
      </div>
      <div id="messages">
      </div>
    </form>
  </body>
</html>

The script is also similar to the Primus script. It uses an EventSource to subscribe to messages and POST requests (via XMLHttpRequest) to publish messages:

example_sse.js
let publish = function (event) {
    event.preventDefault();
};

function connect() {
    const topic = document.getElementById('topic');
    const message = document.getElementById('message');
    const messages = document.getElementById('messages');
    const params = new URLSearchParams(window.location.search);

    function tag_text(cls, text) {
        const div = document.createElement('div');
        div.className = cls;
        div.appendChild(document.createTextNode(text));
        return div;
    }

    function add_message(div) {
        messages.appendChild(div);
        messages.scrollTop = messages.scrollHeight;
    }

    const base_url = 'http://localhost:8802/centro/v2/';
    const source = new EventSource(base_url + // (1)
        'subscribe?authz_token=' + params.get('token') +
        '&topic=' + encodeURIComponent(params.get('subscribe')));

    source.onopen = function () {
        publish = function (event) {
            event.preventDefault();
            var r = new XMLHttpRequest();
            r.open('POST', base_url + // (2)
                'publish?authz_token=' + params.get('token') +
                '&topic=' + encodeURIComponent(topic.value));
            r.send(message.value); // (3)
        };

        add_message(tag_text('status', 'open'));
    };

    source.onerror = function (e) {
        if (e.target.readyState === EventSource.CONNECTING) {
            add_message(tag_text('status', 'connecting'));
        } else if (e.target.readyState === EventSource.CLOSED) {
            add_message(tag_text('status', 'closed'));
        }
    };

    const msgs = new Map();

    source.addEventListener('start', function (e) {
        const info = JSON.parse(e.data); // (4)
        info.data = ''; // (5)
        msgs.set(info.id, info); // (6)
    });

    source.addEventListener('data', function (e) {
        const info = JSON.parse(e.data);
        msgs.get(info.id).data += info.data; // (7)
    });

    source.addEventListener('end', function (e) {
        const info = msgs.get(JSON.parse(e.data).id); // (8)

        const msg = document.createElement('div');
        msg.className = 'message';
        msg.appendChild(tag_text('topic', info.topic));
        msg.appendChild(tag_text('data', info.data));
        add_message(msg);

        msgs.delete(info.id);
    });

    source.addEventListener('peer_error', function () {
        add_message(tag_text('status', 'error'));
    });
}
  1. Create an EventSource which receives messages from the server. We pass the authorization token and the topic we want messages for as query parameters.

  2. POST message to the server using an XMLHttpRequest. We pass the authorization token and message topic as query parameters.

  3. Send the message content.

  4. Each message begins with a start event, which has JSON-encoded data containing the message’s topic and unique ID.

  5. Message data can be delivered across multiple events. In this example we need a place to accumulate it.

  6. Messages can be interleaved so while we’re accumulating data, we need to remember them by their unique IDs.

  7. Message data arrives in data events and we accumulate it here.

  8. When all a message’s data has been received, we get an end event. In this example, we display the message’s topic and data.

Load the example page using a URL like file:///path/to/example_sse.html?subscribe=foo.*&token=XXX

Further details of how messages are delivered using server-sent events are available here.

HTTP/2-duplex

The http2-duplex transport uses the http2-duplex module to emulate a full-duplex connection with browsers. Full-duplex HTTP/2 streaming isn’t implemented by any browser, nor are there any plans to do so. http2-duplex emulates it by using POST requests.

The HTML for this example is the same as Primus HTML except that we don’t need the Primus client:

example_http2.html
<html>
  <head>
    <title>Centro Example</title>
    <link href="example.css" rel="stylesheet" type="text/css">
    <script src="dist/centro.js"></script>
    <script src="example_http2.js"></script>
  </head>
  <body onload="connect()">
    <form onsubmit="publish(event)">
      <div id="input">
        <div>
          <label>topic: <input type="text" id="topic" autofocus></label>
          <label>message:  <input type="text" id="message"></label>
        </div>
        <input type="submit" value="publish">
      </div>
      <div id="messages">
      </div>
    </form>
  </body>
</html>

The script is also similar to the Primus script:

example_http2.js
let publish = function (event) {
    event.preventDefault();
};

function connect() {
    const topic = document.getElementById('topic');
    const message = document.getElementById('message');
    const messages = document.getElementById('messages');
    const params = new URLSearchParams(window.location.search);

    function tag_text(cls, text) {
        const div = document.createElement('div');
        div.className = cls;
        div.appendChild(document.createTextNode(text));
        return div;
    }

    function add_message(div) {
        messages.appendChild(div);
        messages.scrollTop = messages.scrollHeight;
    }

    centro.separate_auth({
        token: params.get('token')
    }, async function (err, userpass, make_client) {
        if (err) { throw(err); }

        const duplex = await centro.make_client_http2_duplex( // (1)
            'https://localhost:8804/centro/v2/http2-duplex', {
                headers: {
                    Authorization: 'Bearer ' + userpass.split(':')[1] // (2)
                }
            });
        const client = make_client(duplex);

        client.on('ready', function () {
            add_message(tag_text('status', 'open'));
            this.subscribe(params.get('subscribe'), function (s, info) {
                centro.read_all(s, function (v) {
                    const msg = document.createElement('div');
                    msg.className = 'message';
                    msg.appendChild(tag_text('topic', info.topic));
                    msg.appendChild(tag_text('data', v.toString()));
                    add_message(msg);
                });
            });

            publish = function (event) {
                event.preventDefault();
                client.publish(topic.value).end(message.value);
            };
        });

        duplex.on('end', function () {
            add_message(tag_text('status', 'closed'));
        });

        client.on('error', function (err) {
            console.error(err);
            duplex.destroy();
        });
    });
}
  1. The Centro client distribution also includes the http2-duplex client, which connects to the server and returns an emulated duplex stream.

  2. You can use Basic or Bearer authentication.

Load the example page using a URL like file:///path/to/example_http2.html?subscribe=foo.*&token=XXX

In-memory client

The in-mem transport lets you connect from the server process itself without the overhead of a TCP connection. For example, to display every message published on every transport you could add the following to server.js:

server.js
const assert = require('assert');
const { JWK, JWT } = require('jose');

server.on('ready', function () {
    const ops = this.transport_ops['in-mem']; // (1)
    const key = JWK.generateSync('oct'); // (2)
    ops.authz.keystore.add_pub_key('test', key, function (err, issuer) { // (3)
        assert.ifError(err);

        const token = JWT.sign({ // (4)
            access_control: { // (5)
                subscribe: { allow: ['#'], disallow: [] },
                publish: { allow: ['#'], disallow: [] }
            }
        }, key, {
            algorithm: 'HS256',
            issuer
        });

        ops.connect(function (err, stream) { // (6)
            assert.ifError(err);

            centro.stream_auth(stream, {
                token
            }).on('ready', function () {
                this.subscribe('#', function (s, info) {
                    console.log('topic:', info.topic);
                    s.pipe(process.stdout);
                }, assert.ifError);
            });
        });
    });
});
  1. You can get to transport-specific operations on the server via the transport_ops property.

  2. Use jose to generate a secret key.

  3. Add the secret key to the transport’s key store. Note that by default, the key store used by an in-mem transport stores keys only in memory.

  4. Use jose to sign an authorization token.

  5. Allow clients on this transport to subscribe and publish to any message topic. Remember when we created the server, we also gave this transport privileged status so clients using it will be able to see messages sent by any other client.

  6. The in-mem transport exposes a function, connect, which allows us to connect to the server from within the same process.

Other clients (server-sent events)

You can also use the HTTP transport outside the browser and from languages other than Node. As long as you can make POST requests, you can publish messages. To subscribe to messages, you’ll need to be able to receive server-sent events.

Python

Here’s an example Python 3 program which publishes a message, topic given on the command line and content read from standard input:

publish.py
import requests, os, sys
params = {
    'authz_token': os.environ['CENTRO_TOKEN'],
    'topic': sys.argv[1]
}
requests.post('http://localhost:8802/centro/v2/publish', # (1)
              params=params,
              data=sys.stdin.buffer).raise_for_status()
  1. Make POST request to publish message.

Subscribe to topics given on the command line and display the topic and content of each message received:

subscribe.py
import requests, sseclient, os, sys, json
params = {
    'authz_token': os.environ['CENTRO_TOKEN'],
    'topic': sys.argv[1:]
}
response = requests.get('http://localhost:8802/centro/v2/subscribe', # (1)
                        params=params, stream=True)
response.raise_for_status()
client = sseclient.SSEClient(response) # (2)
for event in client.events():
    if (event.event == 'start'):
        data = json.loads(event.data)
        print('id:', data['id'], 'topic:', data['topic']) # (3)
    elif (event.event == 'data'):
        sys.stdout.write(json.loads(event.data)['data'].encode('latin1')) # (4) (5)
        sys.stdout.flush()
  1. Make a long-running GET request to subscribe to messages.

  2. Use the sseclient-py module to read messages.

  3. Display message ID and topic.

  4. Display message content. There may be many data events for each message (they will share the same ID).

  5. All Centro message data is a byte array. The HTTP transport encodes it in UTF-8 per the server-sent events spec. It’s encoded such that the UTF-8 data contains only characters that can also be represented in the latin1 (ISO-8859-1) 8-bit encoding. Therefore, to get the message bytes, encode the UTF-8 data using latin1.

Node

You can also use the HTTP transport from Node, if you don’t want to use Primus or TCP.

publish_http.js
process.stdin.pipe(require('http').request({ // (1)
    method: 'POST',
    hostname: 'localhost',
    port: 8802,
    path: '/centro/v2/publish?' + require('querystring').stringify({
        authz_token: process.env.CENTRO_TOKEN,
        topic: process.argv[2]
    })
}));
  1. Make POST request to publish message.

subscribe_http.js
const EventSource = require('eventsource'); // (1)
const es = new EventSource('http://localhost:8802/centro/v2/subscribe?' +
                           require('querystring').stringify({
                               authz_token: process.env.CENTRO_TOKEN,
                               topic: process.argv.slice(2)
                           }));

es.addEventListener('start', function (e) {
    const data = JSON.parse(e.data);
    console.log('id:', data.id, 'topic:', data.topic);
});

es.addEventListener('data', function (e) {
    process.stdout.write(JSON.parse(e.data).data, 'binary');
});

Rust

Here are the same example clients written in Rust. To run them, change directory to test/example/rust/publish or test/example/rust/subscribe and type cargo run plus the message topic.

publish.rs
use std::env;
use reqwest::{Url, Client, Body};
use tokio::io::stdin;
use tokio_util::codec::{FramedRead, BytesCodec};
use futures_util::stream::TryStreamExt;
use log::error;
use env_logger;

#[tokio::main]
async fn main() {
    env_logger::init();
    let url_str = "http://localhost:8802/centro/v2/publish";
    let token = env::var("CENTRO_TOKEN").expect("no token");
    let topic = env::args().nth(1).expect("no topic");
    let url = Url::parse_with_params(url_str, &[
        ("authz_token", token),
        ("topic", topic)])
        .expect("Failed to parse url");
    let st = FramedRead::new(stdin(), BytesCodec::new())
        .map_ok(bytes::BytesMut::freeze);
    let response = Client::new()
        .post(url)
        .body(Body::wrap_stream(st))
        .send()
        .await
        .expect("Failed to send request");
    if !response.status().is_success() {
        error!("HTTP request failed: {}", response.status());
        error!("{}", response.text().await.expect("Failed to read response"));
    }
}
subscribe.rs
use serde_derive::Deserialize;
use std::io::{self, Write};
use std::env;
use reqwest::Url;
use eventsource::event::Event;
use eventsource::reqwest::Client;
use encoding::{Encoding, EncoderTrap};
use encoding::all::ISO_8859_1;
use log::error;
use env_logger;

#[derive(Deserialize)]
struct Start {
    id: u64,
    topic: String
}

#[derive(Deserialize)]
#[allow(dead_code)]
struct Data {
    id: u64,
    data: String
}

fn parse<'a, T>(data: &'a str) -> Option<T>
where T: serde::Deserialize<'a> {
    match serde_json::from_str::<T>(data) {
        Ok(start) => {
            return Some(start);
        },
        Err(err) => {
            error!("Failed to parse JSON: {}", err);
            return None;
        }
    }
}

fn encode(data: &str) -> Option<Vec<u8>> {
    match ISO_8859_1.encode(data, EncoderTrap::Strict) {
        Ok(bytes) => {
            return Some(bytes);
        },
        Err(err) => {
            error!("Failed to covert data to bytes: {}", err);
            return None;
        }
    }
}

fn handle<'a, T>(ev: &'a Event, f: &dyn Fn(T) -> ())
where T: serde::Deserialize<'a> {
    if let Some(v) = parse::<T>(&ev.data) {
        f(v);
    }
}

fn main() {
    env_logger::init();
    let url_str = "http://localhost:8802/centro/v2/subscribe";
    let token = env::var("CENTRO_TOKEN").expect("no token");
    let token_params = vec![("authz_token", token)];
    let topic_params = env::args().skip(1).map(|topic| ("topic", topic));
    let url = Url::parse_with_params(url_str,
        token_params.into_iter().chain(topic_params))
        .expect("Failed to parse url");
    let client = Client::new(url);
    for event in client {
        let ev = event.expect("Failed to read event");
        if let Some(ref evtype) = ev.event_type {
            match evtype.as_str() {
                "start" =>
                    handle::<Start>(&ev, &|start|
                        println!("id: {} topic: {}", start.id, start.topic)),
                "data" =>
                    handle::<Data>(&ev, &|data|
                        if let Some(bytes) = encode(&data.data) {
                            let _ = io::stdout().write(bytes.as_slice());
                            let _ = io::stdout().flush();
                        }),
                _ => {}
            }
        }
    }
}

Closure

Here are the same example clients written in Clojure. To run them, change directory to test/example/clojure/publish or test/example/clojure/subscribe and type lein run plus the message topic.

publish.clj
(ns publish.core
  (:gen-class)
  (:require [clj-http.client :as client]))

(defn -main
  "Publish message to example Centro server"
  [topic]
  (client/post "http://localhost:8802/centro/v2/publish"
    {:query-params {"authz_token" (System/getenv "CENTRO_TOKEN")
                    "topic" topic}
     :body System/in}))
subscribe.clj
(ns subscribe.core
  (:gen-class)
  (:require [cheshire.core :as json])
  (:import [javax.ws.rs.client ClientBuilder]
           [org.glassfish.jersey.media.sse SseFeature EventSource EventListener]))

(deftype OnStart [] EventListener
  (onEvent [_ e]
    (let [data (json/decode (.readData e) true)]
      (println "id:" (:id data) "topic:" (:topic data)))))

(deftype OnData [] EventListener
  (onEvent [_ e]
    (let [data (json/decode (.readData e) true)]
      (.write System/out (.getBytes (:data data) "ISO-8859-1"))
      (flush))))

(defn -main
  "Subscribe to messages from example Centro server"
  [& topics]
  (let [token (System/getenv "CENTRO_TOKEN")
        builder (.register (ClientBuilder/newBuilder) SseFeature)
        client (.build builder)
        target (-> (.target client "http://localhost:8802/centro/v2/subscribe")
                   (.queryParam "authz_token" (into-array Object [token]))
                   (.queryParam "topic" (into-array Object topics)))
        event-source (.build (EventSource/target target))]
    (.register event-source (OnStart.) "start" (into-array String []))
    (.register event-source (OnData.) "data" (into-array String []))
    (.open event-source)
    (println "READY.")
    (loop []
      (Thread/sleep 1000)
      (recur))))

Elixir

Here are the same example clients written in Elixir. To build them, change directory to test/example/elixir/apps/publish or test/example/elixir/apps/subscribe and type mix escript.build plus the message topic.

publish.ex
defmodule Publish do
  def main([topic | _]) do
    HTTPoison.post!("http://localhost:8802/centro/v2/publish",
                    {:stream, IO.stream(:stdio, 100)},
                    [],
                    params: %{authz_token: System.get_env("CENTRO_TOKEN"),
                              topic: topic})
  end
end
subscribe.ex
defmodule Subscribe do
  def main(topics) do
    {:ok, _} = EventsourceEx.new(
      "http://localhost:8802/centro/v2/subscribe?" <>
      URI.encode_query([{"authz_token", System.get_env("CENTRO_TOKEN")} |
                        (for topic <- topics, do: {"topic", topic})]),
      headers: [],
      stream_to: self())
      loop()
  end
  defmodule Start do
    defstruct [:id, :topic]
  end
  defmodule Data do
    defstruct [:id, :data]
  end
  def loop do
    receive do
      %EventsourceEx.Message{event: "start", data: data} ->
        start = Poison.decode!(data, as: %Start{})
        :io.format("id: ~B topic: ~s~n", [start.id, start.topic])
      %EventsourceEx.Message{event: "data", data: data} ->
        data = Poison.decode!(data, as: %Data{})
        IO.write(:unicode.characters_to_binary(data.data, :utf8, :latin1))
    end
    loop()
  end
end

Installation

npm install centro-js

Licence

MIT

Test

NODE_OPTIONS=--max-http-header-size=32768 grunt test

(make sure you do grunt keys at least once first)

Lint

grunt lint

Coverage

NODE_OPTIONS=--max-http-header-size=32768 grunt coverage

Istanbul results are available here.

Coveralls page is here.

About

Pub-sub and work queue server. Wildcards, streams, back-pressure, multi-transport. Just Node and a filesystem required.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published