Skip to content

Commit

Permalink
Merge pull request #126 from issuu/andersfugmann/zmq-eio
Browse files Browse the repository at this point in the history
Eio bindings Zmq
  • Loading branch information
andersfugmann authored Feb 13, 2024
2 parents 497be93 + fe4ccf4 commit afa4d5f
Show file tree
Hide file tree
Showing 11 changed files with 495 additions and 85 deletions.
20 changes: 17 additions & 3 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ jobs:
os:
- ubuntu-20.04
ocaml-version:
- ocaml-base-compiler.5.0.0~beta1
- 5.0.0
- 4.14.0
- 4.04.1
include:
- lwt: true
- async: true
- eio: true
ocaml-version: 5.0.0

runs-on: ${{ matrix.os }}

steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Use OCaml ${{ matrix.ocaml-version }}
uses: ocaml/setup-ocaml@v2
Expand All @@ -35,16 +40,25 @@ jobs:
opam exec -- dune runtest zmq
- name: zmq-lwt
if: matrix.lwt
run: |
opam pin add zmq-lwt.dev . --no-action
opam install --deps-only --with-doc --with-test zmq-lwt
opam exec -- dune build zmq-lwt
opam exec -- dune runtest zmq-lwt
- name: zmq-async
if: ${{ matrix.ocaml-version != 'ocaml-base-compiler.5.0.0~beta1' }}
if: matrix.async
run: |
opam pin add zmq-async.dev . --no-action
opam install --deps-only --with-doc --with-test zmq-async
opam exec -- dune build zmq-async
opam exec -- dune runtest zmq-async
- name: zmq-eio
if: matrix.eio
run: |
opam pin add zmq-eio.dev . --no-action
opam install --deps-only --with-doc --with-test zmq-eio
opam exec -- dune build zmq-eio
opam exec -- dune runtest zmq-eio
12 changes: 12 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,15 @@ Async aware bindings to zmq are available though package zmq-async")
(zmq (= :version))
(lwt (>= 2.6.0))
(ounit2 :with-test)))

(package
(name zmq-eio)
(authors "Anders Fugmann")
(synopsis "Eio aware bindings to ZMQ")
(depends
(ocaml (>= 4.04.1))
(zmq (= :version))
(eio (>= 0.10))
(eio_main (>= 0.10))
(base (>= v0.11.0))
(ounit2 :with-test)))
92 changes: 76 additions & 16 deletions zmq-deferred/src/socket.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,81 @@
module type Socket = sig
type 'a deferred

(** An concurrent zeromq socket *)
type 'a t

type 'a of_socket_args

(** [of_socket s] wraps the zeromq socket [s]*)
val of_socket : ('a Zmq.Socket.t -> 'a t) of_socket_args

(** [to_socket s] extracts the raw zeromq socket from [s] *)
val to_socket : 'a t -> 'a Zmq.Socket.t

(** [recv socket] waits for a message on [socket] without blocking
other concurrent threads *)
val recv : 'a t -> string deferred

(** [send socket] sends a message on [socket] without blocking other
concurrent threads *)
val send : 'a t -> string -> unit deferred

(** [recv_all socket] waits for a multi-part message on [socket] without
blocking other concurrent threads *)
val recv_all : 'a t -> string list deferred

(** [send_all socket m] sends all parts of the multi-part message [m] on
[socket] without blocking other concurrent threads *)
val send_all : 'a t -> string list -> unit deferred

(** [recv_msg socket] waits for a message on [socket] without blocking
other concurrent threads *)
val recv_msg : 'a t -> Zmq.Msg.t deferred

(** [send_msg socket] sends a message on [socket] without blocking other
concurrent threads *)
val send_msg : 'a t -> Zmq.Msg.t -> unit deferred

(** [recv_msg_all socket] waits for a multi-part message on [socket] without
blocking other concurrent threads *)
val recv_msg_all : 'a t -> Zmq.Msg.t list deferred

(** [send_msg_all socket m] sends all parts of the multi-part message [m] on
[socket] without blocking other concurrent threads *)
val send_msg_all : 'a t -> Zmq.Msg.t list -> unit deferred

val close : 'a t -> unit deferred


module Router : sig

(** Identity of a socket connected to the router. *)
type id_t

(** [id_of_string s] coerces [s] into an {!id_t}. *)
val id_of_string : string -> id_t

(** [recv socket] waits for a message on [socket] without blocking other Lwt
threads. *)
val recv : [ `Router ] t -> (id_t * string list) deferred

(** [send socket id message] sends [message] on [socket] to [id] without
blocking other Lwt threads. *)
val send : [ `Router ] t -> id_t -> string list -> unit deferred
end

module Monitor : sig
(** [recv socket] waits for a monitoring event on [socket] without blocking other concurrent threads. *)
val recv : [ `Monitor ] t -> Zmq.Monitor.event deferred
end

end

module Make(T: Deferred.T) = struct
open T
open Deferred.Infix
type 'a deferred = 'a T.t
type 'a of_socket_args = 'a
exception Retry
type 'a t =
{ socket : 'a Zmq.Socket.t;
Expand All @@ -12,21 +87,6 @@ module Make(T: Deferred.T) = struct
mutable closing : bool;
}

let to_string_hum t =
let state = match (Zmq.Socket.events t.socket) with
| Zmq.Socket.No_event -> "No_event"
| Poll_in -> "Poll_in"
| Poll_out -> "Poll_out"
| Poll_in_out -> "Poll_in_out"
| Poll_error -> "Poll_error"
| exception _ -> "Closed"
in
Printf.sprintf "State: %s, Senders #%d, Receivers #%d"
state
(Queue.length t.senders)
(Queue.length t.receivers)


(** Small process that will notify of the fd changes *)
let rec fd_monitor t =
Condition.wait t.fd_condition >>= fun () ->
Expand Down Expand Up @@ -83,7 +143,7 @@ module Make(T: Deferred.T) = struct
Deferred.return ()
end

let of_socket: 'a Zmq.Socket.t -> 'a t = fun socket ->
let of_socket: ('a Zmq.Socket.t -> 'a t) of_socket_args = fun socket ->
let fd = Fd.create (Zmq.Socket.get_fd socket) in
let t =
{ socket; fd;
Expand Down
44 changes: 20 additions & 24 deletions zmq-deferred/src/socket.mli
Original file line number Diff line number Diff line change
@@ -1,58 +1,52 @@
(** The functor allows abstraction of the concurrency monad *)

(** This functor is meant to be as compatible as possible with lwt-zmq. It
should be straight forward to write a functor over Async_zmq.Socket and
Lwt_zmq.Socket.
The functor allows abstraction of the concurrency monad
*)
module Make : functor (T : Deferred.T) -> sig
open T
module type Socket = sig
type 'a deferred

(** An concurrent zeromq socket *)
type 'a t

type 'a of_socket_args

(** [of_socket s] wraps the zeromq socket [s]*)
val of_socket : 'a Zmq.Socket.t -> 'a t
val of_socket : ('a Zmq.Socket.t -> 'a t) of_socket_args

(** [to_socket s] extracts the raw zeromq socket from [s] *)
val to_socket : 'a t -> 'a Zmq.Socket.t

(** Internal statisitcs of the socket *)
val to_string_hum : 'a t -> string

(** [recv socket] waits for a message on [socket] without blocking
other concurrent threads *)
val recv : 'a t -> string Deferred.t
val recv : 'a t -> string deferred

(** [send socket] sends a message on [socket] without blocking other
concurrent threads *)
val send : 'a t -> string -> unit Deferred.t
val send : 'a t -> string -> unit deferred

(** [recv_all socket] waits for a multi-part message on [socket] without
blocking other concurrent threads *)
val recv_all : 'a t -> string list Deferred.t
val recv_all : 'a t -> string list deferred

(** [send_all socket m] sends all parts of the multi-part message [m] on
[socket] without blocking other concurrent threads *)
val send_all : 'a t -> string list -> unit Deferred.t
val send_all : 'a t -> string list -> unit deferred

(** [recv_msg socket] waits for a message on [socket] without blocking
other concurrent threads *)
val recv_msg : 'a t -> Zmq.Msg.t Deferred.t
val recv_msg : 'a t -> Zmq.Msg.t deferred

(** [send_msg socket] sends a message on [socket] without blocking other
concurrent threads *)
val send_msg : 'a t -> Zmq.Msg.t -> unit Deferred.t
val send_msg : 'a t -> Zmq.Msg.t -> unit deferred

(** [recv_msg_all socket] waits for a multi-part message on [socket] without
blocking other concurrent threads *)
val recv_msg_all : 'a t -> Zmq.Msg.t list Deferred.t
val recv_msg_all : 'a t -> Zmq.Msg.t list deferred

(** [send_msg_all socket m] sends all parts of the multi-part message [m] on
[socket] without blocking other concurrent threads *)
val send_msg_all : 'a t -> Zmq.Msg.t list -> unit Deferred.t
val send_msg_all : 'a t -> Zmq.Msg.t list -> unit deferred

val close : 'a t -> unit Deferred.t
val close : 'a t -> unit deferred


module Router : sig
Expand All @@ -65,16 +59,18 @@ module Make : functor (T : Deferred.T) -> sig

(** [recv socket] waits for a message on [socket] without blocking other Lwt
threads. *)
val recv : [ `Router ] t -> (id_t * string list) Deferred.t
val recv : [ `Router ] t -> (id_t * string list) deferred

(** [send socket id message] sends [message] on [socket] to [id] without
blocking other Lwt threads. *)
val send : [ `Router ] t -> id_t -> string list -> unit Deferred.t
val send : [ `Router ] t -> id_t -> string list -> unit deferred
end

module Monitor : sig
(** [recv socket] waits for a monitoring event on [socket] without blocking other concurrent threads. *)
val recv : [ `Monitor ] t -> Zmq.Monitor.event Deferred.t
val recv : [ `Monitor ] t -> Zmq.Monitor.event deferred
end

end

module Make : functor (T : Deferred.T) -> Socket with type 'a deferred = 'a T.t and type 'a of_socket_args = 'a
64 changes: 22 additions & 42 deletions zmq-deferred/test/test.ml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
open OUnit
let verbose = false
let count = 1000


let list_init cnt f =
let rec loop = function
| n when n = cnt -> []
Expand All @@ -16,15 +14,7 @@ module Make(T: Zmq_deferred.Deferred.T) = struct

module Socket = Zmq_deferred.Socket.Make(T)

let rec monitor (s1, s2) () =
match verbose with
| true ->
Printf.eprintf "s1: %s\n%!" (Socket.to_string_hum s1);
Printf.eprintf "s2: %s\n%!" (Socket.to_string_hum s2);
Deferred.sleepf 1.0 >>= monitor (s1, s2)
| false -> Deferred.return ()

let all_ok l = List.fold_left (fun acc a -> acc >>= fun () -> a) (Deferred.return ()) l
let all_ok l = List.fold_left (fun acc a -> acc >>= fun () -> a) (T.Deferred.return ()) l
let setup () =
let make ctx tpe =
let s = Zmq.Socket.create ctx tpe in
Expand All @@ -38,36 +28,28 @@ module Make(T: Zmq_deferred.Deferred.T) = struct
let endpoint = "inproc://test" in
Zmq.Socket.bind s1 endpoint;
Zmq.Socket.connect s2 endpoint;
Deferred.sleepf 0.0001 >>= fun () ->
Deferred.return (ctx, Socket.of_socket s1, Socket.of_socket s2)
T.Deferred.sleepf 0.0001 >>= fun () ->
T.Deferred.return (ctx, Socket.of_socket s1, Socket.of_socket s2)

let teardown (ctx, s1, s2) =
Socket.close s2 >>= fun () ->
Socket.close s1 >>= fun () ->
Zmq.Context.terminate ctx;
Deferred.return ()
T.Deferred.return ()

let rec send ?delay s = function
| 0 -> Deferred.return ()
let rec send ?(delay = 0.0) s = function
| 0 -> T.Deferred.return ()
| n ->
Socket.send s "test" >>= fun _ ->
begin
match delay with
| None -> Deferred.return ()
| Some delay -> Deferred.sleepf delay
end >>= fun () ->
send s (n - 1)

let rec recv ?delay s = function
| 0 -> Deferred.return ()
Socket.send s "test" >>= fun () ->
T.Deferred.sleepf delay >>= fun () ->
send s ~delay (n - 1)

let rec recv ?(delay = 0.0) s = function
| 0 -> T.Deferred.return ()
| n ->
Socket.recv s >>= fun _ ->
begin
match delay with
| None -> Deferred.return ()
| Some delay -> Deferred.sleepf delay
end >>= fun () ->
recv s (n - 1)
T.Deferred.sleepf delay >>= fun () ->
recv s ~delay (n - 1)

(* Tests *)
let test_send_receive (_, s1, s2) =
Expand All @@ -92,7 +74,7 @@ module Make(T: Zmq_deferred.Deferred.T) = struct

let test_slow_send (_, s1, s2) =
all_ok [
recv ~delay:0.001 s2 count;
recv ~delay:0.0001 s2 count;
send s1 (count / 5);
send s1 (count / 5);
send s1 (count / 5);
Expand All @@ -102,7 +84,7 @@ module Make(T: Zmq_deferred.Deferred.T) = struct

let test_slow_receive (_, s1, s2) =
all_ok [
send ~delay:0.001 s2 count;
send ~delay:0.0001 s2 count;
recv s1 (count / 5);
recv s1 (count / 5);
recv s1 (count / 5);
Expand All @@ -111,20 +93,18 @@ module Make(T: Zmq_deferred.Deferred.T) = struct
]

let test_multi (_, s1, s2) =
Deferred.don't_wait_for (monitor (s1, s2));
all_ok (
((send ~delay:0.001 s1 count) :: (list_init count (fun _ -> Socket.recv s2 >>= fun _ -> Deferred.return ())))
((send ~delay:0.0001 s1 count) :: (list_init count (fun _ -> Socket.recv s2 >>= fun _ -> Deferred.return ())))
@
((send ~delay:0.002 s2 count) :: (list_init count (fun _ -> Socket.recv s1 >>= fun _ -> Deferred.return ())))
((send ~delay:0.0002 s2 count) :: (list_init count (fun _ -> Socket.recv s1 >>= fun _ -> Deferred.return ())))
)

let test_slow_mix (_, s1, s2) =
Deferred.don't_wait_for (monitor (s1, s2));
all_ok [
send ~delay:0.001 s2 count; recv ~delay:0.001 s1 count;
send ~delay:0.001 s1 count; recv ~delay:0.001 s2 count;
send ~delay:0.001 s2 count; recv ~delay:0.001 s1 count;
send ~delay:0.001 s1 count; recv ~delay:0.001 s2 count;
send ~delay:0.0001 s2 count; recv ~delay:0.0002 s1 count;
send ~delay:0.0001 s1 count; recv ~delay:0.0002 s2 count;
send ~delay:0.0001 s2 count; recv ~delay:0.0002 s1 count;
send ~delay:0.0001 s1 count; recv ~delay:0.0002 s2 count;
]

let suite (exec : (unit -> unit Deferred.t) -> unit) =
Expand Down
Loading

0 comments on commit afa4d5f

Please sign in to comment.