Skip to content

Commit

Permalink
Merge pull request #48 from robur-coop/queue-observe-messages
Browse files Browse the repository at this point in the history
Queue up observe messages
  • Loading branch information
hannesm authored Aug 15, 2024
2 parents bb3a686 + dab9c15 commit edd6032
Showing 1 changed file with 29 additions and 7 deletions.
36 changes: 29 additions & 7 deletions app/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -562,16 +562,38 @@ let client_loop t fd =
Lwt_list.iter_s (fun (ts, l) ->
write_cmd fd (output id ts l) >|= ignore)
(List.rev out) >>= fun () ->
let q = Queue.create () in
let q_cond = Lwt_condition.create () in
let rec more () =
Lwt_condition.wait cond >>= function
| `End (ts, data) ->
write_cmd fd (output id ts data)
| `Data (ts, data) ->
write_cmd fd (output id ts data) >>= function
| Ok () -> more ()
| Error _ -> Lwt.return (Ok ())
| `End _ as ev ->
Queue.add ev q;
Lwt_condition.signal q_cond ();
Lwt.return_unit
| `Data _ as ev ->
if Queue.length q < 100 then begin
Queue.add ev q;
Lwt_condition.signal q_cond ();
more ()
end else
(* Drop data messages if the client can't keep up *)
Lwt.return_unit
in
more ()
let rec send () =
if Queue.is_empty q then
Lwt_condition.wait q_cond >>= fun () ->
send ()
else
match Queue.take q with
| `End (ts, data) ->
write_cmd fd (output id ts data)
| `Data (ts, data) ->
write_cmd fd (output id ts data) >>= function
| Ok () -> send ()
| Error _ -> Lwt.return (Ok ())
in
let more = more () in
send () >>= fun r -> Lwt.cancel more; Lwt.return r
| None -> Lwt.return (Error (`Msg "uuid not found"))
end
| Builder.Drop_platform p ->
Expand Down

0 comments on commit edd6032

Please sign in to comment.