diff --git a/bin/dune b/bin/dune index bfc7793cad4..325f884a8d7 100644 --- a/bin/dune +++ b/bin/dune @@ -12,6 +12,7 @@ dune_lang predicate_lang fiber + fiber_event_bus stdune dune_console unix diff --git a/bin/main.ml b/bin/main.ml index d720bfd7135..ba47da9fe45 100644 --- a/bin/main.ml +++ b/bin/main.ml @@ -25,6 +25,7 @@ let all : _ Cmdliner.Cmd.t list = ; Ocaml_merlin.command ; Shutdown.command ; Diagnostics.command + ; Monitor.command ] in let groups = diff --git a/bin/monitor.ml b/bin/monitor.ml new file mode 100644 index 00000000000..308837ca17d --- /dev/null +++ b/bin/monitor.ml @@ -0,0 +1,268 @@ +open Import +open Fiber.O +module Client = Dune_rpc_client.Client +module Version_error = Dune_rpc_private.Version_error + +include struct + open Dune_rpc + module Diagnostic = Diagnostic + module Progress = Progress + module Job = Job + module Sub = Sub + module Conv = Conv +end + +(** Utility module for generating [Map] modules for [Diagnostic]s and [Job]s which use + their [Id] as keys. *) +module Id_map (Id : sig + type t + + val compare : t -> t -> Ordering.t + val sexp : (t, Conv.values) Conv.t + end) = +struct + include Map.Make (struct + include Id + + let to_dyn t = Sexp.to_dyn (Conv.to_sexp Id.sexp t) + end) +end + +module Diagnostic_id_map = Id_map (Diagnostic.Id) +module Job_id_map = Id_map (Job.Id) + +module Event = struct + (** Events that the render loop will process. *) + type t = + | Diagnostics of Diagnostic.Event.t list + | Jobs of Job.Event.t list + | Progress of Progress.t +end + +module State : sig + (** Internal state of the render loop. *) + type t + + (** Initial empty state. *) + val init : unit -> t + + module Update : sig + (** Incremental updates to the state. Computes increments of the state that + will be used for efficient rendering. *) + type t + end + + val update : t -> Event.t -> Update.t + + (** Given a state update, render the update. *) + val render : t -> Update.t -> unit +end = struct + type t = + { mutable diagnostics : Diagnostic.t Diagnostic_id_map.t + ; mutable jobs : Job.t Job_id_map.t + ; mutable progress : Progress.t + } + + let init () = + { diagnostics = Diagnostic_id_map.empty; jobs = Job_id_map.empty; progress = Waiting } + ;; + + let done_status ~complete ~remaining ~failed state = + Pp.textf + "Done: %d%% (%d/%d, %d left%s) (jobs: %d)" + (if complete + remaining = 0 then 0 else complete * 100 / (complete + remaining)) + complete + (complete + remaining) + remaining + (match failed with + | 0 -> "" + | failed -> sprintf ", %d failed" failed) + (Job_id_map.cardinal state.jobs) + ;; + + let waiting_for_file_system_changes message = + Pp.seq message (Pp.verbatim ", waiting for filesystem changes...") + ;; + + let restarting_current_build message = + Pp.seq message (Pp.verbatim ", restarting current build...") + ;; + + let had_errors state = + match Diagnostic_id_map.cardinal state.diagnostics with + | 1 -> Pp.verbatim "Had 1 error" + | n -> Pp.textf "Had %d errors" n + ;; + + let status (state : t) = + Console.Status_line.set + (Live + (fun () -> + match (state.progress : Progress.t) with + | Waiting -> Pp.verbatim "Initializing..." + | In_progress { complete; remaining; failed } -> + done_status ~complete ~remaining ~failed state + | Interrupted -> + Pp.tag User_message.Style.Error (Pp.verbatim "Source files changed") + |> restarting_current_build + | Success -> + Pp.tag User_message.Style.Success (Pp.verbatim "Success") + |> waiting_for_file_system_changes + | Failed -> + Pp.tag User_message.Style.Error (had_errors state) + |> waiting_for_file_system_changes)) + ;; + + module Update = struct + type t = + | Update_status + | Add_diagnostics of Diagnostic.t list + | Refresh + + let jobs state jobs = + let jobs = + List.fold_left jobs ~init:state.jobs ~f:(fun acc job_event -> + match (job_event : Job.Event.t) with + | Start job -> Job_id_map.add_exn acc job.id job + | Stop id -> Job_id_map.remove acc id) + in + state.jobs <- jobs; + Update_status + ;; + + let progress state progress = + state.progress <- progress; + Update_status + ;; + + let diagnostics state diagnostics = + let mode, diagnostics = + List.fold_left + diagnostics + ~init:(`Add_only [], state.diagnostics) + ~f:(fun (mode, acc) diag_event -> + match (diag_event : Diagnostic.Event.t) with + | Remove diag -> `Remove, Diagnostic_id_map.remove acc diag.id + | Add diag -> + ( (match mode with + | `Add_only diags -> `Add_only (diag :: diags) + | `Remove -> `Remove) + , Diagnostic_id_map.add_exn acc diag.id diag )) + in + state.diagnostics <- diagnostics; + match mode with + | `Add_only update -> Add_diagnostics (List.rev update) + | `Remove -> Refresh + ;; + end + + let update state (event : Event.t) = + match event with + | Jobs jobs -> Update.jobs state jobs + | Progress progress -> Update.progress state progress + | Diagnostics diagnostics -> Update.diagnostics state diagnostics + ;; + + let render = + let f d = Console.print_user_message (Diagnostic.to_user_message d) in + fun (state : t) (update : Update.t) -> + (match (update : Update.t) with + | Add_diagnostics diags -> List.iter diags ~f + | Update_status -> () + | Refresh -> + Console.reset (); + Diagnostic_id_map.iter state.diagnostics ~f); + status state + ;; +end + +(* A generic loop that continuously fetches events from a [sub] that it opens a + poll to and writes them to the [event] bus. *) +let fetch_loop ~(event : Event.t Fiber_event_bus.t) ~client ~f sub = + Client.poll client sub + >>= function + | Error version_error -> + let* () = Fiber_event_bus.close event in + User_error.raise [ Pp.verbatim (Version_error.message version_error) ] + | Ok poller -> + let rec loop () = + Fiber.collect_errors (fun () -> Client.Stream.next poller) + >>= (function + | Ok (Some payload) -> Fiber_event_bus.push event (f payload) + | Error _ | Ok None -> Fiber_event_bus.close event >>> Fiber.return `Closed) + >>= function + | `Closed -> Fiber.return () + | `Ok -> loop () + in + loop () +;; + +(* Main render loop *) +let render_loop ~(event : Event.t Fiber_event_bus.t) = + Console.reset (); + let state = State.init () in + let rec loop () = + Fiber_event_bus.pop event + >>= function + | `Closed -> + Console.print_user_message + (User_error.make [ Pp.textf "Lost connection to server, exiting..." ]); + Fiber.return () + | `Next event -> + let update = State.update state event in + (* CR-someday alizter: If performance of rendering here on every loop is bad we can + instead batch updates. It should be very simple to write a [State.Update.union] + function that can combine incremental updates to be done at once. *) + State.render state update; + loop () + in + loop () +;; + +let monitor () = + let where = Rpc_common.active_server () in + let* connect = Client.Connection.connect_exn where in + Dune_rpc_impl.Client.client + connect + (Dune_rpc.Initialize.Request.create ~id:(Dune_rpc.Id.make (Sexp.Atom "monitor_cmd"))) + ~f:(fun client -> + let event = Fiber_event_bus.create () in + let module Sub = Dune_rpc_private.Public.Sub in + Fiber.all_concurrently_unit + [ render_loop ~event + ; fetch_loop ~event ~client ~f:(fun x -> Event.Jobs x) Sub.running_jobs + ; fetch_loop ~event ~client ~f:(fun x -> Event.Progress x) Sub.progress + ; fetch_loop ~event ~client ~f:(fun x -> Event.Diagnostics x) Sub.diagnostic + ]) +;; + +let man = + [ `S "DESCRIPTION" + ; `P + {|$(b,dune monitor) connects to an RPC server running in the current + workspace and displays the build progress and diagnostics.|} + ] +;; + +let command = + let info = + let doc = "Connect to a Dune RPC server and monitor it." in + Cmd.info "monitor" ~doc ~man + and term = + let open Import in + let+ (common : Common.t) = Common.term in + let common = Common.forbid_builds common in + let config = Common.init ~log_file:No_log_file common in + let stats = Common.stats common in + let config = + Dune_config.for_scheduler + config + stats + ~insignificant_changes:`Ignore + ~signal_watcher:`Yes + ~watch_exclusions:[] + in + Scheduler.Run.go config ~on_event:(fun _ _ -> ()) ~file_watcher:No_watcher monitor + in + Cmd.v info term +;; diff --git a/bin/monitor.mli b/bin/monitor.mli new file mode 100644 index 00000000000..54b77188162 --- /dev/null +++ b/bin/monitor.mli @@ -0,0 +1 @@ +val command : unit Cmdliner.Cmd.t diff --git a/boot/libs.ml b/boot/libs.ml index 1ec19d660f1..cd661d13d8d 100644 --- a/boot/libs.ml +++ b/boot/libs.ml @@ -35,6 +35,7 @@ let local_libraries = ; ("src/predicate_lang", Some "Predicate_lang", false, None) ; ("otherlibs/dune-private-libs/section", Some "Dune_section", false, None) ; ("src/dune_lang", Some "Dune_lang", false, None) + ; ("src/fiber_event_bus", Some "Fiber_event_bus", false, None) ; ("vendor/opam-file-format", None, false, None) ; ("src/dune_async_io", Some "Dune_async_io", false, None) ; ("src/fiber_util", Some "Fiber_util", false, None) diff --git a/doc/changes/8152.md b/doc/changes/8152.md new file mode 100644 index 00000000000..11d9abcbc2e --- /dev/null +++ b/doc/changes/8152.md @@ -0,0 +1,3 @@ +- Experimental: Added a `$ dune monitor` command that can connect to a running + `dune build` in watch mode and display the errors and progress. (#8152, + @Alizter) diff --git a/doc/dune.inc b/doc/dune.inc index 9392d353b36..c745d7171c9 100644 --- a/doc/dune.inc +++ b/doc/dune.inc @@ -161,6 +161,15 @@ (package dune) (files dune-internal.1)) +(rule + (with-stdout-to dune-monitor.1 + (run dune monitor --help=groff))) + +(install + (section man) + (package dune) + (files dune-monitor.1)) + (rule (with-stdout-to dune-ocaml.1 (run dune ocaml --help=groff))) diff --git a/src/fiber_event_bus/dune b/src/fiber_event_bus/dune new file mode 100644 index 00000000000..bcd7baa4f64 --- /dev/null +++ b/src/fiber_event_bus/dune @@ -0,0 +1,6 @@ +(library + (name fiber_event_bus) + (libraries fiber stdune) + (synopsis "Internal Dune library, do not use!") + (instrumentation + (backend bisect_ppx))) diff --git a/src/fiber_event_bus/fiber_event_bus.ml b/src/fiber_event_bus/fiber_event_bus.ml new file mode 100644 index 00000000000..3ab807b0296 --- /dev/null +++ b/src/fiber_event_bus/fiber_event_bus.ml @@ -0,0 +1,58 @@ +open Stdune +open Fiber.O + +type 'a t = + { mutable status : [ `Open | `Closed ] + ; readers : [ `Closed | `Next of 'a ] Fiber.Ivar.t Queue.t + ; writers : ([ `Closed | `Ok ] Fiber.Ivar.t * 'a) Queue.t + } + +let create () = { status = `Open; readers = Queue.create (); writers = Queue.create () } + +let or_closed t f = + match t.status with + | `Open -> f () + | `Closed -> Fiber.return `Closed +;; + +let push t event = + let* () = Fiber.return () in + or_closed t (fun () -> + match Queue.pop t.readers with + | Some reader -> Fiber.Ivar.fill reader (`Next event) >>> Fiber.return `Ok + | None -> + let ivar = Fiber.Ivar.create () in + Queue.push t.writers (ivar, event); + Fiber.Ivar.read ivar) +;; + +let rec pop_all q ~f = + match Queue.pop q with + | None -> Fiber.return () + | Some e -> + let* () = f e in + pop_all q ~f +;; + +let close t = + let* () = Fiber.return () in + match t.status with + | `Closed -> Fiber.return () + | `Open -> + t.status <- `Closed; + (* only one of these should be full in reality *) + Fiber.fork_and_join_unit + (fun () -> pop_all t.readers ~f:(fun reader -> Fiber.Ivar.fill reader `Closed)) + (fun () -> pop_all t.writers ~f:(fun (writer, _) -> Fiber.Ivar.fill writer `Closed)) +;; + +let pop t = + let* () = Fiber.return () in + or_closed t (fun () -> + match Queue.pop t.writers with + | Some (ivar, event) -> Fiber.Ivar.fill ivar `Ok >>> Fiber.return (`Next event) + | None -> + let ivar = Fiber.Ivar.create () in + Queue.push t.readers ivar; + Fiber.Ivar.read ivar) +;; diff --git a/src/fiber_event_bus/fiber_event_bus.mli b/src/fiber_event_bus/fiber_event_bus.mli new file mode 100644 index 00000000000..a984598aa86 --- /dev/null +++ b/src/fiber_event_bus/fiber_event_bus.mli @@ -0,0 +1,26 @@ +(** An event bus is a first-in-first-out queue. + + It has the following invariants: + - A [push] only finishes when there is a [pop]. + - A [pop] only finishes when there is a [push]. + - Multiple [push]es or [pop]s at once will be blocked until they can be resolved. + However the order is still preserved. + + It can be [close]d preventing further [push]es and [pop]s. *) +type 'a t + +(** [create ()] initializes an empty event bus. *) +val create : unit -> 'a t + +(** [push t a] attempts to push a value [a] to the end of an event bus [t]. It returns a + fiber with [`Ok] if it was successful and [`Closed] otherwise if it was closed. If + the bus doesn't have another [pop] then it will block.*) +val push : 'a t -> 'a -> [ `Closed | `Ok ] Fiber.t + +(** [pop t] attempts to pop the first value from the event bus [t]. It returns a fiber + with [`Next a] if it was successful and [`Closed] otherwise if it was closed. If the + bus doesn't have another [push] then it will block. *) +val pop : 'a t -> [ `Closed | `Next of 'a ] Fiber.t + +(** [close t] closes the event bus [t] preventing further [push]es and [pop]s. *) +val close : 'a t -> unit Fiber.t diff --git a/test/expect-tests/fiber_event_bus/dune b/test/expect-tests/fiber_event_bus/dune new file mode 100644 index 00000000000..70f542cf9a4 --- /dev/null +++ b/test/expect-tests/fiber_event_bus/dune @@ -0,0 +1,19 @@ +(library + (name fiber_event_bus_tests) + (modules fiber_event_bus_tests) + (inline_tests) + (libraries + stdune + fiber + fiber_event_bus + test_scheduler + dune_tests_common + ;; This is because of the (implicit_transitive_deps false) + ;; in dune-project + ppx_expect.config + ppx_expect.config_types + ppx_expect.common + base + ppx_inline_test.config) + (preprocess + (pps ppx_expect))) diff --git a/test/expect-tests/fiber_event_bus/fiber_event_bus_tests.ml b/test/expect-tests/fiber_event_bus/fiber_event_bus_tests.ml new file mode 100644 index 00000000000..fac87162121 --- /dev/null +++ b/test/expect-tests/fiber_event_bus/fiber_event_bus_tests.ml @@ -0,0 +1,154 @@ +open Stdune +open Fiber.O + +let () = Dune_tests_common.init () + +let push_log = function + | `Ok -> printfn "Sucessful push." + | `Closed -> printfn "Couldn't push! Bus was closed." +;; + +let pop_log = function + | `Next a -> printfn "Popped %S." a + | `Closed -> printfn "Couldn't pop! Bus was closed." +;; + +let push t s = + let+ r = Fiber_event_bus.push t s in + push_log r +;; + +let pop t = + let+ r = Fiber_event_bus.pop t in + pop_log r +;; + +let create () = + let bus = Fiber_event_bus.create () in + printfn "Created bus."; + bus +;; + +let close t = + let+ () = Fiber_event_bus.close t in + printfn "Closed bus." +;; + +let test f = + let scheduler = Test_scheduler.create () in + let exec = Fiber.of_thunk (fun () -> f scheduler) in + Test_scheduler.run scheduler exec +;; + +let%expect_test "Push followed by pop and then close" = + test (fun _scheduler -> + let event_bus = create () in + let* () = push event_bus "Hello" + and* () = pop event_bus in + let* () = close event_bus in + Fiber.return ()); + [%expect {| + Created bus. + Popped "Hello". + Sucessful push. + Closed bus. |}] +;; + +let%expect_test "Double close" = + test (fun _scheduler -> + let event_bus = create () in + let* () = close event_bus in + let* () = close event_bus in + Fiber.return ()); + [%expect {| + Created bus. + Closed bus. + Closed bus. |}] +;; + +let%expect_test "Push together with delayed close should close bus and block push." = + test (fun scheduler -> + let event_bus = create () in + let* () = push event_bus "Hello" + and* () = Test_scheduler.yield scheduler >>> close event_bus in + Fiber.return ()); + [%expect {| + Created bus. + Closed bus. + Couldn't push! Bus was closed. |}] +;; + +let%expect_test "Pop together with delayed close should close bus and block pop." = + test (fun scheduler -> + let event_bus = create () in + let* () = pop event_bus + and* () = Test_scheduler.yield scheduler >>> close event_bus in + Fiber.return ()); + [%expect {| + Created bus. + Closed bus. + Couldn't pop! Bus was closed. |}] +;; + +let%expect_test "2 pushes and delayed close" = + test (fun scheduler -> + let event_bus = create () in + let* () = push event_bus "Hello" + and* () = push event_bus "World!" + and* () = Test_scheduler.yield scheduler >>> close event_bus in + Fiber.return ()); + [%expect + {| + Created bus. + Closed bus. + Couldn't push! Bus was closed. + Couldn't push! Bus was closed. |}] +;; + +let%expect_test "2 pops and delayed close" = + test (fun scheduler -> + let event_bus = create () in + let* () = pop event_bus + and* () = pop event_bus + and* () = Test_scheduler.yield scheduler >>> close event_bus in + Fiber.return ()); + [%expect + {| + Created bus. + Closed bus. + Couldn't pop! Bus was closed. + Couldn't pop! Bus was closed. |}] +;; + +let%expect_test "Push and pop with a delayed close" = + test (fun scheduler -> + let event_bus = create () in + let* () = push event_bus "Hello" + and* () = pop event_bus + and* () = Test_scheduler.yield scheduler >>> close event_bus in + Fiber.return ()); + [%expect {| + Created bus. + Popped "Hello". + Sucessful push. + Closed bus. |}] +;; + +let%expect_test "2 pushes together with 2 pops then a close" = + test (fun _scheduler -> + let event_bus = create () in + let* () = push event_bus "Hello" + and* () = push event_bus "World!" + and* () = pop event_bus + and* () = pop event_bus in + let* () = close event_bus in + Fiber.return ()); + [%expect + {| + Created bus. + Popped "Hello". + Popped "World!". + Sucessful push. + Sucessful push. + Closed bus. |}] +;;