Skip to content

Tutorial

Sebastian Jylanki edited this page Sep 28, 2016 · 12 revisions

Core principles

Creating ZMQ sockets can often be performed with just one socket function call.

Every Socket object has four channels associated with it: :in, :out, :ctl-in and :ctl-out.

:in and :ctl-in are the way to communicate with the underlying ZMQ sockets. Those channels are meant only for outgoing commands and messages. You shall only send data to them.

:out and :ctl-out are the way to get information out of the underlying ZMQ sockets. Those channels are meant only for incoming status replies and messages. You shall only receive from them.

:in

Sending messages to :in delivers them to the underlying ZMQ socket for sending out. You can send byte-arrays, strings or collections of them by default (without transducers). If you send a collection it shall be considered as a multi-frame message.

:out

Incoming messages from ZMQ sockets are delivered to :out channels. Received data is byte-array for single-frame messages and collection of byte-arrays for multi-frame messages by default (without transducers).

:ctl-in

You can send arbitrary JZMQ socket manipulating functions with one parameter: the underlying org.zeromq.ZMQ$Socket. Those functions shall be executed in the ZMQ looping thread. Whenever you send a message to :ctl-in, you shall retrieve the return value from the :ctl-out socket.

:ctl-out

Output of the commands sent to :ctl-in is delivered here. It can be anything. :nil means nil.

Utility functions

While you can do everything using :in, :out, :ctl-in and :ctl-out channels, it is often more convenient to make use of some of the utility functions from the utils namespace. Access to underlying channels is extremely important when you want to use core.async functionality directly.

The utility functions have self-explanatory names and documentation is written in the source code, accessible with common development tools.

Resource management and performance

All the sockets you create without specifying a Context object will be associated with the default Context: *context*. That is most likely sufficient unless you need finely grained control over IO resources and CPU scheduling. Each Context has own set of injector-thread, zmq-thread and underlying ZMQ context and sockets managed by it. If you have one socket dealing with huge messages while others are just passing small messages, it might be a good idea to move the big message passing socket to another Context to give the small sockets more CPU time.

When you close (or terminate) a Context, it will close all the associated sockets automatically, so it can be used as an object lifetime management tool. Sockets don't need to be individually closed if their Context will be closed soon after.

Examples

req-rep

(let [rep (socket :rep "@tcp://*:*") ; bind to a random tcp port
      req (doto (socket :req) (connect! rep))] ; connect to rep socket
  (send! req "ping?")
  (println (String. (recv! rep)))
  (send! rep "pong!")
  (println (String. (recv! req))))

connect! here gets the last endpoint of the :rep socket and connects to it.

pub-sub

(let [pub (socket :pub "@tcp://*:*")
      sub (doto (socket :sub) (connect! pub) (subscribe! ""))]
  (send! pub "broadcast")
  (String. (recv! sub)))

subscribe! with arg "" subscribes to all messages.

core.async integration

(let [pull (socket :pull "@tcp://*:*")
        push (doto (socket :push) (connect! pull))
        ch   (chan)
        term (go-loop []
               (let [[v _] (alts! [ch (:out pull)])]
                 (when v
                   (println (String. v))
                   (recur))))]
    (send! push "from push")
    (>!! ch "from chan"))