diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index c42fd33..0fa428f 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -12,15 +12,20 @@ jobs: os: - ubuntu-20.04 ocaml-version: - - ocaml-base-compiler.5.0.0~beta1 + - 5.0.0 - 4.14.0 - 4.04.1 + include: + - lwt: true + - async: true + - eio: true + ocaml-version: 5.0.0 runs-on: ${{ matrix.os }} steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Use OCaml ${{ matrix.ocaml-version }} uses: ocaml/setup-ocaml@v2 @@ -35,6 +40,7 @@ jobs: opam exec -- dune runtest zmq - name: zmq-lwt + if: matrix.lwt run: | opam pin add zmq-lwt.dev . --no-action opam install --deps-only --with-doc --with-test zmq-lwt @@ -42,9 +48,17 @@ jobs: opam exec -- dune runtest zmq-lwt - name: zmq-async - if: ${{ matrix.ocaml-version != 'ocaml-base-compiler.5.0.0~beta1' }} + if: matrix.async run: | opam pin add zmq-async.dev . --no-action opam install --deps-only --with-doc --with-test zmq-async opam exec -- dune build zmq-async opam exec -- dune runtest zmq-async + + - name: zmq-eio + if: matrix.eio + run: | + opam pin add zmq-eio.dev . --no-action + opam install --deps-only --with-doc --with-test zmq-eio + opam exec -- dune build zmq-eio + opam exec -- dune runtest zmq-eio diff --git a/dune-project b/dune-project index dbdcbce..f11a886 100644 --- a/dune-project +++ b/dune-project @@ -43,3 +43,15 @@ Async aware bindings to zmq are available though package zmq-async") (zmq (= :version)) (lwt (>= 2.6.0)) (ounit2 :with-test))) + + (package + (name zmq-eio) + (authors "Anders Fugmann") + (synopsis "Eio aware bindings to ZMQ") + (depends + (ocaml (>= 4.04.1)) + (zmq (= :version)) + (eio (>= 0.10)) + (eio_main (>= 0.10)) + (base (>= v0.11.0)) + (ounit2 :with-test))) diff --git a/zmq-deferred/src/socket.ml b/zmq-deferred/src/socket.ml index e75edde..0d34713 100644 --- a/zmq-deferred/src/socket.ml +++ b/zmq-deferred/src/socket.ml @@ -1,6 +1,81 @@ +module type Socket = sig + type 'a deferred + + (** An concurrent zeromq socket *) + type 'a t + + type 'a of_socket_args + + (** [of_socket s] wraps the zeromq socket [s]*) + val of_socket : ('a Zmq.Socket.t -> 'a t) of_socket_args + + (** [to_socket s] extracts the raw zeromq socket from [s] *) + val to_socket : 'a t -> 'a Zmq.Socket.t + + (** [recv socket] waits for a message on [socket] without blocking + other concurrent threads *) + val recv : 'a t -> string deferred + + (** [send socket] sends a message on [socket] without blocking other + concurrent threads *) + val send : 'a t -> string -> unit deferred + + (** [recv_all socket] waits for a multi-part message on [socket] without + blocking other concurrent threads *) + val recv_all : 'a t -> string list deferred + + (** [send_all socket m] sends all parts of the multi-part message [m] on + [socket] without blocking other concurrent threads *) + val send_all : 'a t -> string list -> unit deferred + + (** [recv_msg socket] waits for a message on [socket] without blocking + other concurrent threads *) + val recv_msg : 'a t -> Zmq.Msg.t deferred + + (** [send_msg socket] sends a message on [socket] without blocking other + concurrent threads *) + val send_msg : 'a t -> Zmq.Msg.t -> unit deferred + + (** [recv_msg_all socket] waits for a multi-part message on [socket] without + blocking other concurrent threads *) + val recv_msg_all : 'a t -> Zmq.Msg.t list deferred + + (** [send_msg_all socket m] sends all parts of the multi-part message [m] on + [socket] without blocking other concurrent threads *) + val send_msg_all : 'a t -> Zmq.Msg.t list -> unit deferred + + val close : 'a t -> unit deferred + + + module Router : sig + + (** Identity of a socket connected to the router. *) + type id_t + + (** [id_of_string s] coerces [s] into an {!id_t}. *) + val id_of_string : string -> id_t + + (** [recv socket] waits for a message on [socket] without blocking other Lwt + threads. *) + val recv : [ `Router ] t -> (id_t * string list) deferred + + (** [send socket id message] sends [message] on [socket] to [id] without + blocking other Lwt threads. *) + val send : [ `Router ] t -> id_t -> string list -> unit deferred + end + + module Monitor : sig + (** [recv socket] waits for a monitoring event on [socket] without blocking other concurrent threads. *) + val recv : [ `Monitor ] t -> Zmq.Monitor.event deferred + end + +end + module Make(T: Deferred.T) = struct open T open Deferred.Infix + type 'a deferred = 'a T.t + type 'a of_socket_args = 'a exception Retry type 'a t = { socket : 'a Zmq.Socket.t; @@ -12,21 +87,6 @@ module Make(T: Deferred.T) = struct mutable closing : bool; } - let to_string_hum t = - let state = match (Zmq.Socket.events t.socket) with - | Zmq.Socket.No_event -> "No_event" - | Poll_in -> "Poll_in" - | Poll_out -> "Poll_out" - | Poll_in_out -> "Poll_in_out" - | Poll_error -> "Poll_error" - | exception _ -> "Closed" - in - Printf.sprintf "State: %s, Senders #%d, Receivers #%d" - state - (Queue.length t.senders) - (Queue.length t.receivers) - - (** Small process that will notify of the fd changes *) let rec fd_monitor t = Condition.wait t.fd_condition >>= fun () -> @@ -83,7 +143,7 @@ module Make(T: Deferred.T) = struct Deferred.return () end - let of_socket: 'a Zmq.Socket.t -> 'a t = fun socket -> + let of_socket: ('a Zmq.Socket.t -> 'a t) of_socket_args = fun socket -> let fd = Fd.create (Zmq.Socket.get_fd socket) in let t = { socket; fd; diff --git a/zmq-deferred/src/socket.mli b/zmq-deferred/src/socket.mli index 4e52b8e..7b7b6df 100644 --- a/zmq-deferred/src/socket.mli +++ b/zmq-deferred/src/socket.mli @@ -1,58 +1,52 @@ +(** The functor allows abstraction of the concurrency monad *) -(** This functor is meant to be as compatible as possible with lwt-zmq. It - should be straight forward to write a functor over Async_zmq.Socket and - Lwt_zmq.Socket. - - The functor allows abstraction of the concurrency monad -*) -module Make : functor (T : Deferred.T) -> sig - open T +module type Socket = sig + type 'a deferred (** An concurrent zeromq socket *) type 'a t + type 'a of_socket_args + (** [of_socket s] wraps the zeromq socket [s]*) - val of_socket : 'a Zmq.Socket.t -> 'a t + val of_socket : ('a Zmq.Socket.t -> 'a t) of_socket_args (** [to_socket s] extracts the raw zeromq socket from [s] *) val to_socket : 'a t -> 'a Zmq.Socket.t - (** Internal statisitcs of the socket *) - val to_string_hum : 'a t -> string - (** [recv socket] waits for a message on [socket] without blocking other concurrent threads *) - val recv : 'a t -> string Deferred.t + val recv : 'a t -> string deferred (** [send socket] sends a message on [socket] without blocking other concurrent threads *) - val send : 'a t -> string -> unit Deferred.t + val send : 'a t -> string -> unit deferred (** [recv_all socket] waits for a multi-part message on [socket] without blocking other concurrent threads *) - val recv_all : 'a t -> string list Deferred.t + val recv_all : 'a t -> string list deferred (** [send_all socket m] sends all parts of the multi-part message [m] on [socket] without blocking other concurrent threads *) - val send_all : 'a t -> string list -> unit Deferred.t + val send_all : 'a t -> string list -> unit deferred (** [recv_msg socket] waits for a message on [socket] without blocking other concurrent threads *) - val recv_msg : 'a t -> Zmq.Msg.t Deferred.t + val recv_msg : 'a t -> Zmq.Msg.t deferred (** [send_msg socket] sends a message on [socket] without blocking other concurrent threads *) - val send_msg : 'a t -> Zmq.Msg.t -> unit Deferred.t + val send_msg : 'a t -> Zmq.Msg.t -> unit deferred (** [recv_msg_all socket] waits for a multi-part message on [socket] without blocking other concurrent threads *) - val recv_msg_all : 'a t -> Zmq.Msg.t list Deferred.t + val recv_msg_all : 'a t -> Zmq.Msg.t list deferred (** [send_msg_all socket m] sends all parts of the multi-part message [m] on [socket] without blocking other concurrent threads *) - val send_msg_all : 'a t -> Zmq.Msg.t list -> unit Deferred.t + val send_msg_all : 'a t -> Zmq.Msg.t list -> unit deferred - val close : 'a t -> unit Deferred.t + val close : 'a t -> unit deferred module Router : sig @@ -65,16 +59,18 @@ module Make : functor (T : Deferred.T) -> sig (** [recv socket] waits for a message on [socket] without blocking other Lwt threads. *) - val recv : [ `Router ] t -> (id_t * string list) Deferred.t + val recv : [ `Router ] t -> (id_t * string list) deferred (** [send socket id message] sends [message] on [socket] to [id] without blocking other Lwt threads. *) - val send : [ `Router ] t -> id_t -> string list -> unit Deferred.t + val send : [ `Router ] t -> id_t -> string list -> unit deferred end module Monitor : sig (** [recv socket] waits for a monitoring event on [socket] without blocking other concurrent threads. *) - val recv : [ `Monitor ] t -> Zmq.Monitor.event Deferred.t + val recv : [ `Monitor ] t -> Zmq.Monitor.event deferred end end + +module Make : functor (T : Deferred.T) -> Socket with type 'a deferred = 'a T.t and type 'a of_socket_args = 'a diff --git a/zmq-deferred/test/test.ml b/zmq-deferred/test/test.ml index 4d33b2e..a568adb 100644 --- a/zmq-deferred/test/test.ml +++ b/zmq-deferred/test/test.ml @@ -1,8 +1,6 @@ open OUnit -let verbose = false let count = 1000 - let list_init cnt f = let rec loop = function | n when n = cnt -> [] @@ -16,15 +14,7 @@ module Make(T: Zmq_deferred.Deferred.T) = struct module Socket = Zmq_deferred.Socket.Make(T) - let rec monitor (s1, s2) () = - match verbose with - | true -> - Printf.eprintf "s1: %s\n%!" (Socket.to_string_hum s1); - Printf.eprintf "s2: %s\n%!" (Socket.to_string_hum s2); - Deferred.sleepf 1.0 >>= monitor (s1, s2) - | false -> Deferred.return () - - let all_ok l = List.fold_left (fun acc a -> acc >>= fun () -> a) (Deferred.return ()) l + let all_ok l = List.fold_left (fun acc a -> acc >>= fun () -> a) (T.Deferred.return ()) l let setup () = let make ctx tpe = let s = Zmq.Socket.create ctx tpe in @@ -38,36 +28,28 @@ module Make(T: Zmq_deferred.Deferred.T) = struct let endpoint = "inproc://test" in Zmq.Socket.bind s1 endpoint; Zmq.Socket.connect s2 endpoint; - Deferred.sleepf 0.0001 >>= fun () -> - Deferred.return (ctx, Socket.of_socket s1, Socket.of_socket s2) + T.Deferred.sleepf 0.0001 >>= fun () -> + T.Deferred.return (ctx, Socket.of_socket s1, Socket.of_socket s2) let teardown (ctx, s1, s2) = Socket.close s2 >>= fun () -> Socket.close s1 >>= fun () -> Zmq.Context.terminate ctx; - Deferred.return () + T.Deferred.return () - let rec send ?delay s = function - | 0 -> Deferred.return () + let rec send ?(delay = 0.0) s = function + | 0 -> T.Deferred.return () | n -> - Socket.send s "test" >>= fun _ -> - begin - match delay with - | None -> Deferred.return () - | Some delay -> Deferred.sleepf delay - end >>= fun () -> - send s (n - 1) - - let rec recv ?delay s = function - | 0 -> Deferred.return () + Socket.send s "test" >>= fun () -> + T.Deferred.sleepf delay >>= fun () -> + send s ~delay (n - 1) + + let rec recv ?(delay = 0.0) s = function + | 0 -> T.Deferred.return () | n -> Socket.recv s >>= fun _ -> - begin - match delay with - | None -> Deferred.return () - | Some delay -> Deferred.sleepf delay - end >>= fun () -> - recv s (n - 1) + T.Deferred.sleepf delay >>= fun () -> + recv s ~delay (n - 1) (* Tests *) let test_send_receive (_, s1, s2) = @@ -92,7 +74,7 @@ module Make(T: Zmq_deferred.Deferred.T) = struct let test_slow_send (_, s1, s2) = all_ok [ - recv ~delay:0.001 s2 count; + recv ~delay:0.0001 s2 count; send s1 (count / 5); send s1 (count / 5); send s1 (count / 5); @@ -102,7 +84,7 @@ module Make(T: Zmq_deferred.Deferred.T) = struct let test_slow_receive (_, s1, s2) = all_ok [ - send ~delay:0.001 s2 count; + send ~delay:0.0001 s2 count; recv s1 (count / 5); recv s1 (count / 5); recv s1 (count / 5); @@ -111,20 +93,18 @@ module Make(T: Zmq_deferred.Deferred.T) = struct ] let test_multi (_, s1, s2) = - Deferred.don't_wait_for (monitor (s1, s2)); all_ok ( - ((send ~delay:0.001 s1 count) :: (list_init count (fun _ -> Socket.recv s2 >>= fun _ -> Deferred.return ()))) + ((send ~delay:0.0001 s1 count) :: (list_init count (fun _ -> Socket.recv s2 >>= fun _ -> Deferred.return ()))) @ - ((send ~delay:0.002 s2 count) :: (list_init count (fun _ -> Socket.recv s1 >>= fun _ -> Deferred.return ()))) + ((send ~delay:0.0002 s2 count) :: (list_init count (fun _ -> Socket.recv s1 >>= fun _ -> Deferred.return ()))) ) let test_slow_mix (_, s1, s2) = - Deferred.don't_wait_for (monitor (s1, s2)); all_ok [ - send ~delay:0.001 s2 count; recv ~delay:0.001 s1 count; - send ~delay:0.001 s1 count; recv ~delay:0.001 s2 count; - send ~delay:0.001 s2 count; recv ~delay:0.001 s1 count; - send ~delay:0.001 s1 count; recv ~delay:0.001 s2 count; + send ~delay:0.0001 s2 count; recv ~delay:0.0002 s1 count; + send ~delay:0.0001 s1 count; recv ~delay:0.0002 s2 count; + send ~delay:0.0001 s2 count; recv ~delay:0.0002 s1 count; + send ~delay:0.0001 s1 count; recv ~delay:0.0002 s2 count; ] let suite (exec : (unit -> unit Deferred.t) -> unit) = diff --git a/zmq-eio.opam b/zmq-eio.opam new file mode 100644 index 0000000..d6df78a --- /dev/null +++ b/zmq-eio.opam @@ -0,0 +1,33 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Eio aware bindings to ZMQ" +maintainer: ["Anders Fugmann "] +authors: ["Anders Fugmann"] +license: "MIT" +homepage: "https://github.com/issuu/ocaml-zmq" +bug-reports: "https://github.com/issuu/ocaml-zmq/issues" +depends: [ + "dune" {>= "2.7"} + "ocaml" {>= "4.04.1"} + "zmq" {= version} + "eio" {>= "0.10"} + "eio_main" {>= "0.10"} + "base" {>= "v0.11.0"} + "ounit2" {with-test} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/issuu/ocaml-zmq.git" diff --git a/zmq-eio/src/dune b/zmq-eio/src/dune new file mode 100644 index 0000000..b254135 --- /dev/null +++ b/zmq-eio/src/dune @@ -0,0 +1,4 @@ +(library + (name zmq_eio) + (public_name zmq-eio) + (libraries zmq.deferred eio eio_main base)) diff --git a/zmq-eio/src/socket.ml b/zmq-eio/src/socket.ml new file mode 100644 index 0000000..4445e6c --- /dev/null +++ b/zmq-eio/src/socket.ml @@ -0,0 +1,159 @@ +(** Eio based bindings for eio *) +exception Closed + +type 'a t = { + socket : 'a Zmq.Socket.t; + fd : Unix.file_descr; + senders : (unit -> unit) Queue.t; + receivers : (unit -> unit) Queue.t; + condition : Eio.Condition.t; + mutex : Eio.Mutex.t; + ready_condition: Eio.Condition.t; + mutable thread : unit Eio.Promise.or_exn option; (* None indicates already closed *) +} + +type 'a of_socket_args = sw:Eio.Switch.t -> 'a +type 'a deferred = 'a + +(** invoke the first function on the queue, but only pop it if it does not raise EAGAIN *) +let process queue = + match (Queue.peek queue) () with + | () -> + let (_: unit -> unit) = Queue.pop queue in + () + | exception Unix.Unix_error (Unix.EAGAIN, _, _) -> + (* If f raised EAGAIN, dont pop the message. *) + (* This should never happen. If so, the queue could be replaced with a Eio.Stream for faster handling *) + () + +let with_lock lock f = + Eio.Mutex.lock lock; + try + let v = f () in + Eio.Mutex.unlock lock; + v + with + | e -> + Eio.Mutex.unlock lock; + raise e + +let rec fd_monitor t = + Eio.Condition.await_no_mutex t.ready_condition; + Eio_unix.await_readable t.fd; + with_lock t.mutex (fun () -> Eio.Condition.broadcast t.condition); + fd_monitor t + +let rec event_loop t = + let inner () = + match Zmq.Socket.events t.socket with + | Zmq.Socket.Poll_error -> + failwith "Cannot poll socket" + | (Poll_in_out | Poll_in) when not (Queue.is_empty t.receivers) -> + process t.receivers + | (Poll_in_out | Poll_out) when not (Queue.is_empty t.senders) -> + process t.senders + | _ -> + Eio.Condition.broadcast t.ready_condition; + Eio.Condition.await t.condition t.mutex; + in + with_lock t.mutex (fun () -> inner ()); + match t.thread with + | None when Queue.is_empty t.senders && Queue.is_empty t.receivers -> + () + | _ -> + event_loop t + +let of_socket: ('a Zmq.Socket.t -> 'a t) of_socket_args = fun ~sw socket -> + let fd = Zmq.Socket.get_fd socket in + let t = + { socket; + fd; + senders = Queue.create (); + receivers = Queue.create (); + mutex = Eio.Mutex.create (); + condition = Eio.Condition.create (); + ready_condition = Eio.Condition.create (); + thread = None; + } + in + let thread = Eio.Fiber.fork_promise ~sw (fun () -> + Eio.Switch.run (fun sw -> + Eio.Fiber.fork ~sw (fun () -> event_loop t); + Eio.Fiber.fork_daemon ~sw (fun () -> fd_monitor t); + () + ); + ) + in + t.thread <- Some thread; + t + +let to_socket t = + t.socket + +(** Stop the deamon thread, and ensure that all sends and receives has been handled *) +let close t = + let thread = match t.thread with + | None -> failwith "Socket already closed" + | Some t -> t + in + with_lock t.mutex (fun () -> t.thread <- None; Eio.Condition.broadcast t.condition); + let _e = Eio.Promise.await_exn thread in + Zmq.Socket.close t.socket; + () + + +let request t queue f = + let () = + match t.thread with + | None -> raise Closed + | Some _ -> () + in + let (pt, pu) = Eio.Promise.create ~label:"Zmq" () in + let f () = + let v = f () in + Eio.Promise.resolve pu v + in + with_lock t.mutex (fun () -> Queue.push f queue; Eio.Condition.broadcast t.condition); + Eio.Promise.await pt + +let send t message = + request t t.senders (fun () -> Zmq.Socket.send ~block:false t.socket message) + +let send_msg t message = + request t t.senders (fun () -> Zmq.Socket.send_msg ~block:false t.socket message) + +let send_all t = + request t t.receivers (fun () -> Zmq.Socket.send_all ~block:false t.socket) + +let send_msg_all t = + request t t.receivers (fun () -> Zmq.Socket.send_msg_all ~block:false t.socket) + +let recv t = + request t t.receivers (fun () -> Zmq.Socket.recv ~block:false t.socket) + +let recv_msg t = + request t t.receivers (fun () -> Zmq.Socket.recv_msg ~block:false t.socket) + +let recv_all t = + request t t.receivers (fun () -> Zmq.Socket.recv_all ~block:false t.socket) + +let recv_msg_all t = + request t t.receivers (fun () -> Zmq.Socket.recv_msg_all ~block:false t.socket) + +module Router = struct + type id_t = string + + let id_of_string t = t + + let recv t = + match recv_all t with + | id :: message -> (id, message) + | _ -> assert false + + let send t id message = + send_all t (id :: message) +end + +module Monitor = struct + let recv t = request t t.receivers (fun () -> Zmq.Monitor.recv ~block:false t.socket) +end diff --git a/zmq-eio/src/socket.mli b/zmq-eio/src/socket.mli new file mode 100644 index 0000000..6fed4e0 --- /dev/null +++ b/zmq-eio/src/socket.mli @@ -0,0 +1 @@ +include Zmq_deferred.Socket.Socket with type 'a deferred = 'a and type 'a of_socket_args = sw:Eio.Switch.t -> 'a diff --git a/zmq-eio/test/dune b/zmq-eio/test/dune new file mode 100644 index 0000000..9f0b88d --- /dev/null +++ b/zmq-eio/test/dune @@ -0,0 +1,11 @@ +(executable + (name test) + (libraries zmq-eio ounit2 eio eio_main)) + +(rule + (alias runtest) + (deps + (:test test.exe)) + (action + (run %{test})) + (package zmq-eio)) diff --git a/zmq-eio/test/test.ml b/zmq-eio/test/test.ml new file mode 100644 index 0000000..503216d --- /dev/null +++ b/zmq-eio/test/test.ml @@ -0,0 +1,140 @@ +open OUnit + +let sleepf env secs = Eio.Time.sleep (Eio.Stdenv.clock env) secs + +let setup ~sw env = + let make ctx tpe = + let s = Zmq.Socket.create ctx tpe in + Zmq.Socket.set_receive_high_water_mark s 1; + Zmq.Socket.set_send_high_water_mark s 2; + s + in + let ctx = Zmq.Context.create () in + let s1 = make ctx Zmq.Socket.pair in + let s2 = make ctx Zmq.Socket.pair in + let endpoint = "inproc://test" in + Zmq.Socket.bind s1 endpoint; + Zmq.Socket.connect s2 endpoint; + (* Sleep a bit *) + sleepf env 0.0001; + (ctx, Zmq_eio.Socket.of_socket ~sw s1, Zmq_eio.Socket.of_socket ~sw s2) + +let teardown ~sw:_ _env (ctx, s1, s2) = + Zmq_eio.Socket.close s2; + Zmq_eio.Socket.close s1; + Zmq.Context.terminate ctx; + () + +let all_ok l = + Eio.Fiber.List.iter (fun f -> f ()) l + +let send env ?(delay = 0.0) s count = + let rec inner = function + | 0 -> () + | n -> + Zmq_eio.Socket.send s "test"; + sleepf env delay; + inner (n - 1) + in + fun () -> inner count + +let recv env ?(delay = 0.0) s count = + let rec inner = function + | 0 -> () + | n -> + let _ = Zmq_eio.Socket.recv s in + sleepf env delay; + inner (n - 1) + in + fun () -> inner count + +(** Test functions *) +let test_setup_teardown ~sw:_ _env _s = () + +let count = 1000 +(* Tests *) +let test_send_receive ~sw:_ env (_, s1, s2) = + all_ok [ + send env s2 count; + recv env s1 count; + ] + +let test_msend_mreceive ~sw:_ env (_, s1, s2) = + all_ok [ + send env s2 count; send env s2 count; send env s2 count; send env s2 count; + recv env s1 count; recv env s1 count; recv env s1 count; recv env s1 count; + ] + +let test_mix ~sw:_ env (_, s1, s2) = + all_ok [ + send env s2 count; recv env s1 count; + send env s1 count; recv env s2 count; + send env s2 count; recv env s1 count; + send env s1 count; recv env s2 count; + send env s2 count; recv env s1 count; + ] + +let test_slow_send ~sw:_ env (_, s1, s2) = + all_ok [ + recv env ~delay:0.0001 s2 count; + send env s1 (count / 5); + send env s1 (count / 5); + send env s1 (count / 5); + send env s1 (count / 5); + send env s1 (count / 5); + ] + +let test_slow_receive ~sw:_ env (_, s1, s2) = + all_ok [ + send env ~delay:0.0001 s2 count; + recv env s1 (count / 5); + recv env s1 (count / 5); + recv env s1 (count / 5); + recv env s1 (count / 5); + recv env s1 (count / 5); + ] + +let test_slow_mix1 ~sw:_ env (_, s1, s2) = + all_ok [ + send env ~delay:0.0001 s2 count; recv env ~delay:0.0002 s1 count; + send env ~delay:0.0001 s1 count; recv env ~delay:0.0002 s2 count; + send env ~delay:0.0001 s2 count; recv env ~delay:0.0002 s1 count; + send env ~delay:0.0001 s1 count; recv env ~delay:0.0002 s2 count; + ] + +let test_slow_mix2 ~sw:_ env (_, s1, s2) = + all_ok [ + send env ~delay:0.0002 s2 count; recv env ~delay:0.0001 s1 count; + send env ~delay:0.0002 s1 count; recv env ~delay:0.0001 s2 count; + send env ~delay:0.0002 s2 count; recv env ~delay:0.0001 s1 count; + send env ~delay:0.0002 s1 count; recv env ~delay:0.0001 s2 count; + ] + + +let suite () = + let bracket test = + let f sw env = + let s = setup ~sw env in + match test ~sw env s with + | v -> teardown ~sw env s; v + | exception e -> teardown ~sw env s; raise e + in + fun () -> Eio_linux.run (fun env -> + Eio.Switch.run (fun sw -> f sw env)) + in + + __MODULE__ >::: [ + "test_setup_teardown" >:: bracket test_setup_teardown; + "test_send_receive" >:: bracket test_send_receive; + "test_msend_mreceive" >:: bracket test_msend_mreceive; + "test_mix" >:: bracket test_mix; + "test_slow_send" >:: bracket test_slow_send; + "test_slow_receive" >:: bracket test_slow_receive; + "test_slow_mix" >:: bracket test_slow_mix1; + "test_slow_mix" >:: bracket test_slow_mix2; + "test_send_receive" >:: bracket test_send_receive; + ] + + +let () = + run_test_tt_main (suite ()) |> ignore