Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace deprecated runloop in fsevents #8304

Merged
merged 1 commit into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/dune_file_watcher/dune_file_watcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type kind =
}
| Fsevents of
{ mutable external_ : Fsevents.t Watch_trie.t
; runloop : Fsevents.RunLoop.t
; dispatch_queue : Fsevents.Dispatch_queue.t
; scheduler : Scheduler.t
; source : Fsevents.t
; sync : Fsevents.t
Expand Down Expand Up @@ -577,29 +577,31 @@ let create_fsevents ?(latency = 0.2) ~(scheduler : Scheduler.t) () =
fsevents ~latency scheduler ~exclusion_paths ~paths fsevents_standard_event
in
let cv = Condition.create () in
let runloop_ref = ref None in
let dispatch_queue_ref = ref None in
let mutex = Mutex.create () in
scheduler.spawn_thread (fun () ->
let runloop = Fsevents.RunLoop.in_current_thread () in
let dispatch_queue = Fsevents.Dispatch_queue.create () in
Mutex.lock mutex;
runloop_ref := Some runloop;
dispatch_queue_ref := Some dispatch_queue;
Condition.signal cv;
Mutex.unlock mutex;
Fsevents.start source runloop;
Fsevents.start sync runloop;
match Fsevents.RunLoop.run_current_thread runloop with
Fsevents.start source dispatch_queue;
Fsevents.start sync dispatch_queue;
match Fsevents.Dispatch_queue.wait_until_stopped dispatch_queue with
| Ok () -> ()
| Error exn -> Code_error.raise "fsevents callback raised" [ "exn", Exn.to_dyn exn ]);
let external_ = Watch_trie.empty in
let runloop =
let dispatch_queue =
Mutex.lock mutex;
while !runloop_ref = None do
while !dispatch_queue_ref = None do
Condition.wait cv mutex
done;
Mutex.unlock mutex;
Option.value_exn !runloop_ref
Option.value_exn !dispatch_queue_ref
in
{ kind = Fsevents { latency; scheduler; sync; source; external_; runloop }; sync_table }
{ kind = Fsevents { latency; scheduler; sync; source; external_; dispatch_queue }
; sync_table
}
;;

let fswatch_win_callback ~(scheduler : Scheduler.t) ~sync_table ~should_exclude event =
Expand Down Expand Up @@ -724,7 +726,7 @@ let add_watch t path =
| Watch_trie.Under_existing_node -> Ok ()
| Inserted { new_t; removed } ->
let watch = Lazy.force watch in
Fsevents.start watch f.runloop;
Fsevents.start watch f.dispatch_queue;
List.iter removed ~f:(fun (_, fs) -> Fsevents.stop fs);
f.external_ <- new_t;
Ok ())))
Expand Down
6 changes: 3 additions & 3 deletions src/fsevents/bin/dune_fsevents.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ let fsevents =
;;

let () =
let runloop = Fsevents.RunLoop.in_current_thread () in
Fsevents.start fsevents runloop;
match Fsevents.RunLoop.run_current_thread runloop with
let dispatch_queue = Fsevents.Dispatch_queue.create () in
Fsevents.start fsevents dispatch_queue;
match Fsevents.Dispatch_queue.wait_until_stopped dispatch_queue with
| Ok () -> ()
| Error e -> raise e
;;
49 changes: 26 additions & 23 deletions src/fsevents/fsevents.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@ end = struct
;;
end

module RunLoop = struct
module Dispatch_queue = struct
module Raw = struct
type t

external in_current_thread : unit -> t = "dune_fsevents_runloop_current"
external create : unit -> t = "dune_fsevents_dispatch_queue_create"

(* After this function terminates, the reference to [t] is no longer
valid *)
external run_current_thread : t -> unit = "dune_fsevents_runloop_run"
external wait_until_stopped
: t
-> unit
= "dune_fsevents_dispatch_queue_wait_until_stopped"
end

type state =
Expand All @@ -48,28 +51,28 @@ module RunLoop = struct

type t = state State.t

let in_current_thread () = State.create (Idle (Raw.in_current_thread ()))
let create () = State.create (Idle (Raw.create ()))

let stop (t : t) =
State.critical_section t (fun t ->
match State.get t with
| Running _ -> State.set t Stopped
| Stopped -> ()
| Idle _ -> Code_error.raise "RunLoop.stop: not started" [])
| Idle _ -> Code_error.raise "Dispatch_queue.stop: not started" [])
;;

let run_current_thread t =
let wait_until_stopped t =
let w =
State.critical_section t (fun t ->
match State.get t with
| Stopped -> Code_error.raise "RunLoop.run_current_thread: stopped" []
| Running _ -> Code_error.raise "RunLoop.run_current_thread: running" []
| Stopped -> Code_error.raise "Dispatch_queue.wait_until_stopped: stopped" []
| Running _ -> Code_error.raise "Dispatch_queue.wait_until_stopped: running" []
| Idle w ->
State.set t (Running w);
w)
in
let res =
try Ok (Raw.run_current_thread w) with
try Ok (Raw.wait_until_stopped w) with
| exn -> Error exn
in
stop t;
Expand Down Expand Up @@ -232,7 +235,7 @@ module Raw = struct
type t

external stop : t -> unit = "dune_fsevents_stop"
external start : t -> RunLoop.Raw.t -> unit = "dune_fsevents_start"
external start : t -> Dispatch_queue.Raw.t -> unit = "dune_fsevents_start"

external create
: string list
Expand All @@ -254,8 +257,8 @@ end

type state =
| Idle of Raw.t
| Start of Raw.t * RunLoop.t
| Stop of RunLoop.t
| Start of Raw.t * Dispatch_queue.t
| Stop of Dispatch_queue.t

type t = state State.t

Expand All @@ -264,30 +267,30 @@ let stop t =
match State.get t with
| Idle _ -> Code_error.raise "Fsevents.stop: idle" []
| Stop _ -> ()
| Start (raw, rl) ->
State.set t (Stop rl);
| Start (raw, dq) ->
State.set t (Stop dq);
Raw.stop raw)
;;

let start t (rl : RunLoop.t) =
let start t (dq : Dispatch_queue.t) =
State.critical_section t (fun t ->
match State.get t with
| Stop _ -> Code_error.raise "Fsevents.start: stop" []
| Start _ -> Code_error.raise "Fsevents.start: start" []
| Idle r ->
State.critical_section rl (fun rl' ->
match State.get rl' with
| Stopped -> Code_error.raise "Fsevents.start: runloop stopped" []
| Idle rl' | Running rl' ->
State.set t (Start (r, rl));
Raw.start r rl'))
State.critical_section dq (fun dq' ->
match State.get dq' with
| Stopped -> Code_error.raise "Fsevents.start: dispatch queue stopped" []
| Idle dq' | Running dq' ->
State.set t (Start (r, dq));
Raw.start r dq'))
;;

let runloop t =
let dispatch_queue t =
State.critical_section t (fun t ->
match State.get t with
| Idle _ -> None
| Start (_, rl) | Stop rl -> Some rl)
| Start (_, dq) | Stop dq -> Some dq)
;;

let flush_sync t =
Expand Down
16 changes: 7 additions & 9 deletions src/fsevents/fsevents.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

val available : unit -> bool

module RunLoop : sig
module Dispatch_queue : sig
type t

val in_current_thread : unit -> t
val run_current_thread : t -> (unit, exn) result
val create : unit -> t
val wait_until_stopped : t -> (unit, exn) result
end

module Event : sig
Expand Down Expand Up @@ -63,14 +63,12 @@ type t
debouncing based on [latency]. [f] is called for every new event *)
val create : paths:string list -> latency:float -> f:(Event.t list -> unit) -> t

(** [start t] will start listening for fsevents. Note that the callback will not
be called until [loop t] is called. *)
val start : t -> RunLoop.t -> unit
(** [start t dq] will start listening for fsevents. *)
val start : t -> Dispatch_queue.t -> unit

val runloop : t -> RunLoop.t option
val dispatch_queue : t -> Dispatch_queue.t option

(** [stop t] stop listening to events. Note that this will not make [loop]
return until [break] is called. *)
(** [stop t] stops listening to events. *)
val stop : t -> unit

(** [flush_sync t] flush all pending events that might be held up by debouncing.
Expand Down
Loading
Loading