Skip to content

Commit

Permalink
feature: dune monitor
Browse files Browse the repository at this point in the history
Co-authored-by: Rudi Grinberg <[email protected]>
Signed-off-by: Ali Caglayan <[email protected]>
  • Loading branch information
Alizter and rgrinberg committed Aug 19, 2023
1 parent d7fc655 commit 48bc983
Show file tree
Hide file tree
Showing 12 changed files with 556 additions and 0 deletions.
1 change: 1 addition & 0 deletions bin/dune
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
dune_lang
predicate_lang
fiber
fiber_event_bus
stdune
dune_console
unix
Expand Down
1 change: 1 addition & 0 deletions bin/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ let all : _ Cmdliner.Cmd.t list =
; Ocaml_merlin.command
; Shutdown.command
; Diagnostics.command
; Monitor.command
]
in
let groups =
Expand Down
277 changes: 277 additions & 0 deletions bin/monitor.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
open Import
open Fiber.O
module Client = Dune_rpc_client.Client
module Version_error = Dune_rpc_private.Version_error

include struct
open Dune_rpc
module Diagnostic = Diagnostic
module Progress = Progress
module Job = Job
module Sub = Sub
module Conv = Conv
end

(** Utility module for generating [Map] modules for [Diagnostic]s and [Job]s which use
their [Id] as keys. *)
module Id_map (Id : sig
type t

val compare : t -> t -> Ordering.t
val sexp : (t, Conv.values) Conv.t
end) =
struct
include Map.Make (struct
include Id

let to_dyn t = Sexp.to_dyn (Conv.to_sexp Id.sexp t)
end)
end

module Diagnostic_id_map = Id_map (Diagnostic.Id)
module Job_id_map = Id_map (Job.Id)

module Event = struct
(** Events that the render loop will process. *)
type t =
| Diagnostics of Diagnostic.Event.t list
| Jobs of Job.Event.t list
| Progress of Progress.t
end

module State : sig
(** Internal state of the render loop. *)
type t

(** Initial empty state. *)
val init : unit -> t

module Update : sig
type state = t

(** Incremental updates to the state. Computes increments of the state that will be
used for efficient rendering. *)
type t

val jobs : state -> Job.Event.t list -> t
val progress : state -> Progress.t -> t
val diagnostics : state -> Diagnostic.Event.t list -> t
end

(** Given a state update, render the update. *)
val render : t -> Update.t -> unit
end = struct
type t =
{ mutable diagnostics : Diagnostic.t Diagnostic_id_map.t
; mutable jobs : Job.t Job_id_map.t
; mutable progress : Progress.t
}

let init () =
{ diagnostics = Diagnostic_id_map.empty; jobs = Job_id_map.empty; progress = Waiting }
;;

let done_status ~complete ~remaining ~failed state =
Pp.textf
"Done: %d%% (%d/%d, %d left%s) (jobs: %d)"
(if complete + remaining = 0 then 0 else complete * 100 / (complete + remaining))
complete
(complete + remaining)
remaining
(match failed with
| 0 -> ""
| failed -> sprintf ", %d failed" failed)
(Job_id_map.cardinal state.jobs)
;;

let waiting_for_file_system_changes message =
Pp.seq message (Pp.verbatim ", waiting for filesystem changes...")
;;

let restarting_current_build message =
Pp.seq message (Pp.verbatim ", restarting current build...")
;;

let had_errors state =
match Diagnostic_id_map.cardinal state.diagnostics with
| 1 -> Pp.verbatim "Had 1 error"
| n -> Pp.textf "Had %d errors" n
;;

let status (state : t) =
Console.Status_line.set
(Live
(fun () ->
match (state.progress : Progress.t) with
| Waiting -> Pp.verbatim "Initializing..."
| In_progress { complete; remaining; failed } ->
done_status ~complete ~remaining ~failed state
| Interrupted ->
Pp.tag User_message.Style.Error (Pp.verbatim "Source files changed")
|> restarting_current_build
| Success ->
Pp.tag User_message.Style.Success (Pp.verbatim "Success")
|> waiting_for_file_system_changes
| Failed ->
Pp.tag User_message.Style.Error (had_errors state)
|> waiting_for_file_system_changes))
;;

module Update = struct
type state = t

type t =
| Update_status
| Add_diagnostics of Diagnostic.t list
| Refresh

let jobs (state : state) jobs =
let jobs =
List.fold_left jobs ~init:state.jobs ~f:(fun acc job_event ->
match (job_event : Job.Event.t) with
| Start job -> Job_id_map.add_exn acc job.id job
| Stop id -> Job_id_map.remove acc id)
in
state.jobs <- jobs;
Update_status
;;

let progress (state : state) progress =
state.progress <- progress;
Update_status
;;

let diagnostics (state : state) diagnostics =
let mode, diagnostics =
List.fold_left
diagnostics
~init:(`Add_only [], state.diagnostics)
~f:(fun (mode, acc) diag_event ->
match (diag_event : Diagnostic.Event.t) with
| Add diag ->
( (match mode with
| `Add_only diags -> `Add_only (diag :: diags)
| `Remove -> `Remove)
, Diagnostic_id_map.add_exn acc diag.id diag )
| Remove diag -> `Remove, Diagnostic_id_map.remove acc diag.id)
in
state.diagnostics <- diagnostics;
match mode with
| `Add_only update -> Add_diagnostics (List.rev update)
| `Remove -> Refresh
;;
end

let render =
let f d = Console.print_user_message (Diagnostic.to_user_message d) in
fun (state : t) (update : Update.t) ->
(match (update : Update.t) with
| Add_diagnostics diags -> List.iter diags ~f
| Update_status -> ()
| Refresh ->
Console.reset ();
Diagnostic_id_map.iter state.diagnostics ~f);
status state
;;
end

(** A generic loop that continuously fetches events from a [sub] that it opens a poll to
and writes them to the [event] bus. *)
let fetch_loop ~(event : Event.t Fiber_event_bus.t) ~client ~f sub =
Client.poll client sub
>>= function
| Error version_error ->
let* () = Fiber_event_bus.close event in
User_error.raise [ Pp.verbatim (Version_error.message version_error) ]
| Ok poller ->
let rec loop () =
Fiber.collect_errors (fun () -> Client.Stream.next poller)
>>= (function
| Ok (Some payload) -> Fiber_event_bus.push event (f payload)
| Error _ | Ok None -> Fiber_event_bus.close event >>> Fiber.return `Closed)
>>= function
| `Closed -> Fiber.return ()
| `Ok -> loop ()
in
loop ()
;;

(** Main render loop *)
let render_loop ~(event : Event.t Fiber_event_bus.t) =
Console.reset ();
let state = State.init () in
let rec loop () =
Fiber_event_bus.pop event
>>= function
| `Closed ->
Console.print_user_message
(User_error.make [ Pp.textf "Lost connection to server, exiting..." ]);
Fiber.return ()
| `Next event ->
let update =
match event with
| Event.Jobs jobs -> State.Update.jobs state jobs
| Progress progress -> State.Update.progress state progress
| Diagnostics diagnostics -> State.Update.diagnostics state diagnostics
in
(* CR-someday alizter: If performance of rendering here on every loop is bad we can
instead batch updates. It should be very simple to write a [State.Update.union]
function that can combine incremental updates to be done at once. *)
State.render state update;
loop ()
in
loop ()
;;

(** Main method *)
let begin_monitor_main () =
let where = Rpc_common.active_server () in
let* connect = Client.Connection.connect_exn where in
Dune_rpc_impl.Client.client
connect
(Dune_rpc.Initialize.Request.create ~id:(Dune_rpc.Id.make (Sexp.Atom "monitor_cmd")))
~f:(fun client ->
let event = Fiber_event_bus.create () in
let module Sub = Dune_rpc_private.Public.Sub in
Fiber.all_concurrently_unit
[ render_loop ~event
; fetch_loop ~event ~client ~f:(fun x -> Event.Jobs x) Sub.running_jobs
; fetch_loop ~event ~client ~f:(fun x -> Event.Progress x) Sub.progress
; fetch_loop ~event ~client ~f:(fun x -> Event.Diagnostics x) Sub.diagnostic
])
;;

let man =
[ `S "DESCRIPTION"
; `P
{|$(b,dune monitor) connects to an RPC server running in the current
workspace and displays the build progress and diagnostics.|}
]
;;

let command =
let info =
let doc = "Connect to a Dune RPC server and monitor it." in
Cmd.info "monitor" ~doc ~man
and term =
let open Import in
let+ (common : Common.t) = Common.term in
let common = Common.forbid_builds common in
let config = Common.init ~log_file:No_log_file common in
let stats = Common.stats common in
let config =
Dune_config.for_scheduler
config
stats
~insignificant_changes:`Ignore
~signal_watcher:`Yes
~watch_exclusions:[]
in
Scheduler.Run.go
config
~on_event:(fun _ _ -> ())
~file_watcher:No_watcher
begin_monitor_main
in
Cmd.v info term
;;
1 change: 1 addition & 0 deletions bin/monitor.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
val command : unit Cmdliner.Cmd.t
1 change: 1 addition & 0 deletions boot/libs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ let local_libraries =
; ("src/predicate_lang", Some "Predicate_lang", false, None)
; ("otherlibs/dune-private-libs/section", Some "Dune_section", false, None)
; ("src/dune_lang", Some "Dune_lang", false, None)
; ("src/fiber_event_bus", Some "Fiber_event_bus", false, None)
; ("vendor/opam-file-format", None, false, None)
; ("src/dune_async_io", Some "Dune_async_io", false, None)
; ("src/fiber_util", Some "Fiber_util", false, None)
Expand Down
3 changes: 3 additions & 0 deletions doc/changes/8152.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Added a `dune monitor` command that can connect to a running `dune build` in watch mode
and display the errors and progress. This command is still experimental and may be
subject to further changes.
9 changes: 9 additions & 0 deletions doc/dune.inc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@
(package dune)
(files dune-internal.1))

(rule
(with-stdout-to dune-monitor.1
(run dune monitor --help=groff)))

(install
(section man)
(package dune)
(files dune-monitor.1))

(rule
(with-stdout-to dune-ocaml.1
(run dune ocaml --help=groff)))
Expand Down
6 changes: 6 additions & 0 deletions src/fiber_event_bus/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
(library
(name fiber_event_bus)
(libraries fiber stdune)
(synopsis "Internal Dune library, do not use!")
(instrumentation
(backend bisect_ppx)))
58 changes: 58 additions & 0 deletions src/fiber_event_bus/fiber_event_bus.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
open Stdune
open Fiber.O

type 'a t =
{ mutable status : [ `Open | `Closed ]
; readers : [ `Closed | `Next of 'a ] Fiber.Ivar.t Queue.t
; writers : ([ `Closed | `Ok ] Fiber.Ivar.t * 'a) Queue.t
}

let create () = { status = `Open; readers = Queue.create (); writers = Queue.create () }

let or_closed t f =
match t.status with
| `Open -> f ()
| `Closed -> Fiber.return `Closed
;;

let push t event =
let* () = Fiber.return () in
or_closed t (fun () ->
match Queue.pop t.readers with
| Some reader -> Fiber.Ivar.fill reader (`Next event) >>> Fiber.return `Ok
| None ->
let ivar = Fiber.Ivar.create () in
Queue.push t.writers (ivar, event);
Fiber.Ivar.read ivar)
;;

let rec pop_all q ~f =
match Queue.pop q with
| None -> Fiber.return ()
| Some e ->
let* () = f e in
pop_all q ~f
;;

let close t =
let* () = Fiber.return () in
match t.status with
| `Closed -> Fiber.return ()
| `Open ->
t.status <- `Closed;
(* only one of these should be full in reality *)
Fiber.fork_and_join_unit
(fun () -> pop_all t.readers ~f:(fun reader -> Fiber.Ivar.fill reader `Closed))
(fun () -> pop_all t.writers ~f:(fun (writer, _) -> Fiber.Ivar.fill writer `Closed))
;;

let pop t =
let* () = Fiber.return () in
or_closed t (fun () ->
match Queue.pop t.writers with
| Some (ivar, event) -> Fiber.Ivar.fill ivar `Ok >>> Fiber.return (`Next event)
| None ->
let ivar = Fiber.Ivar.create () in
Queue.push t.readers ivar;
Fiber.Ivar.read ivar)
;;
Loading

0 comments on commit 48bc983

Please sign in to comment.