From 753dc68a76e3af97dcfa46d5089f41685c244f26 Mon Sep 17 00:00:00 2001 From: Kevin Ji <1146876+kevinji@users.noreply.github.com> Date: Sun, 30 Jul 2023 15:51:44 -0700 Subject: [PATCH] feat: replace deprecated runloop in fsevents Replace the use of the deprecated `FSEventStreamScheduleWithRunLoop()` with `FSEventStreamSetDispatchQueue()`. Since a dispatch queue spawns new threads, we need to call `caml_c_thread_register()` and `caml_c_thread_unregister()` for these threads. This is done indirectly via a `pthread_key_t` so that the functions only need to be called more than once if the dispatch queue switches which thread is running the callback. We replicate the blocking nature of the existing code using a mutex and condition variable. See git/git@b0226007f0aaf448dec1defe3e44c4e3d7513aa8 for more details about this approach. Signed-off-by: Kevin Ji <1146876+kevinji@users.noreply.github.com> --- src/dune_file_watcher/dune_file_watcher.ml | 26 +-- src/fsevents/bin/dune_fsevents.ml | 6 +- src/fsevents/fsevents.ml | 49 +++--- src/fsevents/fsevents.mli | 16 +- src/fsevents/fsevents_stubs.c | 168 +++++++++++-------- test/expect-tests/fsevents/fsevents_tests.ml | 6 +- 6 files changed, 155 insertions(+), 116 deletions(-) diff --git a/src/dune_file_watcher/dune_file_watcher.ml b/src/dune_file_watcher/dune_file_watcher.ml index 3c7bf0eb567..4a67dfa1754 100644 --- a/src/dune_file_watcher/dune_file_watcher.ml +++ b/src/dune_file_watcher/dune_file_watcher.ml @@ -138,7 +138,7 @@ type kind = } | Fsevents of { mutable external_ : Fsevents.t Watch_trie.t - ; runloop : Fsevents.RunLoop.t + ; dispatch_queue : Fsevents.Dispatch_queue.t ; scheduler : Scheduler.t ; source : Fsevents.t ; sync : Fsevents.t @@ -577,29 +577,31 @@ let create_fsevents ?(latency = 0.2) ~(scheduler : Scheduler.t) () = fsevents ~latency scheduler ~exclusion_paths ~paths fsevents_standard_event in let cv = Condition.create () in - let runloop_ref = ref None in + let dispatch_queue_ref = ref None in let mutex = Mutex.create () in scheduler.spawn_thread (fun () -> - let runloop = Fsevents.RunLoop.in_current_thread () in + let dispatch_queue = Fsevents.Dispatch_queue.create () in Mutex.lock mutex; - runloop_ref := Some runloop; + dispatch_queue_ref := Some dispatch_queue; Condition.signal cv; Mutex.unlock mutex; - Fsevents.start source runloop; - Fsevents.start sync runloop; - match Fsevents.RunLoop.run_current_thread runloop with + Fsevents.start source dispatch_queue; + Fsevents.start sync dispatch_queue; + match Fsevents.Dispatch_queue.wait_until_stopped dispatch_queue with | Ok () -> () | Error exn -> Code_error.raise "fsevents callback raised" [ "exn", Exn.to_dyn exn ]); let external_ = Watch_trie.empty in - let runloop = + let dispatch_queue = Mutex.lock mutex; - while !runloop_ref = None do + while !dispatch_queue_ref = None do Condition.wait cv mutex done; Mutex.unlock mutex; - Option.value_exn !runloop_ref + Option.value_exn !dispatch_queue_ref in - { kind = Fsevents { latency; scheduler; sync; source; external_; runloop }; sync_table } + { kind = Fsevents { latency; scheduler; sync; source; external_; dispatch_queue } + ; sync_table + } ;; let fswatch_win_callback ~(scheduler : Scheduler.t) ~sync_table ~should_exclude event = @@ -724,7 +726,7 @@ let add_watch t path = | Watch_trie.Under_existing_node -> Ok () | Inserted { new_t; removed } -> let watch = Lazy.force watch in - Fsevents.start watch f.runloop; + Fsevents.start watch f.dispatch_queue; List.iter removed ~f:(fun (_, fs) -> Fsevents.stop fs); f.external_ <- new_t; Ok ()))) diff --git a/src/fsevents/bin/dune_fsevents.ml b/src/fsevents/bin/dune_fsevents.ml index 46e38cd8f22..15f4c683590 100644 --- a/src/fsevents/bin/dune_fsevents.ml +++ b/src/fsevents/bin/dune_fsevents.ml @@ -16,9 +16,9 @@ let fsevents = ;; let () = - let runloop = Fsevents.RunLoop.in_current_thread () in - Fsevents.start fsevents runloop; - match Fsevents.RunLoop.run_current_thread runloop with + let dispatch_queue = Fsevents.Dispatch_queue.create () in + Fsevents.start fsevents dispatch_queue; + match Fsevents.Dispatch_queue.wait_until_stopped dispatch_queue with | Ok () -> () | Error e -> raise e ;; diff --git a/src/fsevents/fsevents.ml b/src/fsevents/fsevents.ml index 84e2f2523a2..861af543a57 100644 --- a/src/fsevents/fsevents.ml +++ b/src/fsevents/fsevents.ml @@ -30,15 +30,18 @@ end = struct ;; end -module RunLoop = struct +module Dispatch_queue = struct module Raw = struct type t - external in_current_thread : unit -> t = "dune_fsevents_runloop_current" + external create : unit -> t = "dune_fsevents_dispatch_queue_create" (* After this function terminates, the reference to [t] is no longer valid *) - external run_current_thread : t -> unit = "dune_fsevents_runloop_run" + external wait_until_stopped + : t + -> unit + = "dune_fsevents_dispatch_queue_wait_until_stopped" end type state = @@ -48,28 +51,28 @@ module RunLoop = struct type t = state State.t - let in_current_thread () = State.create (Idle (Raw.in_current_thread ())) + let create () = State.create (Idle (Raw.create ())) let stop (t : t) = State.critical_section t (fun t -> match State.get t with | Running _ -> State.set t Stopped | Stopped -> () - | Idle _ -> Code_error.raise "RunLoop.stop: not started" []) + | Idle _ -> Code_error.raise "Dispatch_queue.stop: not started" []) ;; - let run_current_thread t = + let wait_until_stopped t = let w = State.critical_section t (fun t -> match State.get t with - | Stopped -> Code_error.raise "RunLoop.run_current_thread: stopped" [] - | Running _ -> Code_error.raise "RunLoop.run_current_thread: running" [] + | Stopped -> Code_error.raise "Dispatch_queue.wait_until_stopped: stopped" [] + | Running _ -> Code_error.raise "Dispatch_queue.wait_until_stopped: running" [] | Idle w -> State.set t (Running w); w) in let res = - try Ok (Raw.run_current_thread w) with + try Ok (Raw.wait_until_stopped w) with | exn -> Error exn in stop t; @@ -232,7 +235,7 @@ module Raw = struct type t external stop : t -> unit = "dune_fsevents_stop" - external start : t -> RunLoop.Raw.t -> unit = "dune_fsevents_start" + external start : t -> Dispatch_queue.Raw.t -> unit = "dune_fsevents_start" external create : string list @@ -254,8 +257,8 @@ end type state = | Idle of Raw.t - | Start of Raw.t * RunLoop.t - | Stop of RunLoop.t + | Start of Raw.t * Dispatch_queue.t + | Stop of Dispatch_queue.t type t = state State.t @@ -264,30 +267,30 @@ let stop t = match State.get t with | Idle _ -> Code_error.raise "Fsevents.stop: idle" [] | Stop _ -> () - | Start (raw, rl) -> - State.set t (Stop rl); + | Start (raw, dq) -> + State.set t (Stop dq); Raw.stop raw) ;; -let start t (rl : RunLoop.t) = +let start t (dq : Dispatch_queue.t) = State.critical_section t (fun t -> match State.get t with | Stop _ -> Code_error.raise "Fsevents.start: stop" [] | Start _ -> Code_error.raise "Fsevents.start: start" [] | Idle r -> - State.critical_section rl (fun rl' -> - match State.get rl' with - | Stopped -> Code_error.raise "Fsevents.start: runloop stopped" [] - | Idle rl' | Running rl' -> - State.set t (Start (r, rl)); - Raw.start r rl')) + State.critical_section dq (fun dq' -> + match State.get dq' with + | Stopped -> Code_error.raise "Fsevents.start: dispatch queue stopped" [] + | Idle dq' | Running dq' -> + State.set t (Start (r, dq)); + Raw.start r dq')) ;; -let runloop t = +let dispatch_queue t = State.critical_section t (fun t -> match State.get t with | Idle _ -> None - | Start (_, rl) | Stop rl -> Some rl) + | Start (_, dq) | Stop dq -> Some dq) ;; let flush_sync t = diff --git a/src/fsevents/fsevents.mli b/src/fsevents/fsevents.mli index 254863ccef3..fb9083172ee 100644 --- a/src/fsevents/fsevents.mli +++ b/src/fsevents/fsevents.mli @@ -4,11 +4,11 @@ val available : unit -> bool -module RunLoop : sig +module Dispatch_queue : sig type t - val in_current_thread : unit -> t - val run_current_thread : t -> (unit, exn) result + val create : unit -> t + val wait_until_stopped : t -> (unit, exn) result end module Event : sig @@ -63,14 +63,12 @@ type t debouncing based on [latency]. [f] is called for every new event *) val create : paths:string list -> latency:float -> f:(Event.t list -> unit) -> t -(** [start t] will start listening for fsevents. Note that the callback will not - be called until [loop t] is called. *) -val start : t -> RunLoop.t -> unit +(** [start t dq] will start listening for fsevents. *) +val start : t -> Dispatch_queue.t -> unit -val runloop : t -> RunLoop.t option +val dispatch_queue : t -> Dispatch_queue.t option -(** [stop t] stop listening to events. Note that this will not make [loop] - return until [break] is called. *) +(** [stop t] stops listening to events. *) val stop : t -> unit (** [flush_sync t] flush all pending events that might be held up by debouncing. diff --git a/src/fsevents/fsevents_stubs.c b/src/fsevents/fsevents_stubs.c index 8117a9610da..ea73d42ea25 100644 --- a/src/fsevents/fsevents_stubs.c +++ b/src/fsevents/fsevents_stubs.c @@ -14,30 +14,45 @@ #include #include - -typedef struct dune_runloop { - CFRunLoopRef runloop; +#include + +// We use a condvar/mutex pair to allow the code calling +// `dune_fsevents_dispatch_queue_wait_until_stopped` to be signaled by either +// +// - An exception raised by the callback passed to the dispatch queue +// - Explicitly stopping via `dune_fsevents_stop` +// +// Since a dispatch queue uses an internal pool of threads, we need some +// synchronization around updating `v_exn`. +typedef struct dune_dispatch_queue { + dispatch_queue_t dq; + pthread_cond_t dq_finished; + pthread_mutex_t dq_lock; value v_exn; -} dune_runloop; +} dune_dispatch_queue; typedef struct dune_fsevents_t { - dune_runloop *runloop; + dune_dispatch_queue *dq; value v_callback; FSEventStreamRef stream; } dune_fsevents_t; -#define Runloop_val(v) (*((dune_runloop **)Data_custom_val(v))) +#define Dispatch_queue_val(v) (*((dune_dispatch_queue **)Data_custom_val(v))) -void dune_fsevents_runloop_finalize(value v_runloop) { - dune_runloop *runloop = Runloop_val(v_runloop); - caml_stat_free(runloop); +void dune_fsevents_dispatch_queue_finalize(value v_dq) { + dune_dispatch_queue *dq = Dispatch_queue_val(v_dq); + if (dq->dq) + dispatch_release(dq->dq); + pthread_cond_destroy(&dq->dq_finished); + pthread_mutex_destroy(&dq->dq_lock); + caml_stat_free(dq); } -static struct custom_operations dune_fsevents_runloop_ops = { - "dune.fsevents.runloop", dune_fsevents_runloop_finalize, - custom_compare_default, custom_hash_default, - custom_serialize_default, custom_deserialize_default, - custom_compare_ext_default, custom_fixed_length_default}; +static struct custom_operations dune_fsevents_dispatch_queue_ops = { + "build.dune.fsevents.dispatch_queue", dune_fsevents_dispatch_queue_finalize, + custom_compare_default, custom_hash_default, + custom_serialize_default, custom_deserialize_default, + custom_compare_ext_default, custom_fixed_length_default}; #define Fsevents_val(v) (*(dune_fsevents_t **)Data_custom_val(v)) @@ -48,38 +63,68 @@ static void dune_fsevents_t_finalize(value v_t) { } static struct custom_operations dune_fsevents_t_ops = { - "dune.fsevents.fsevents_t", dune_fsevents_t_finalize, - custom_compare_default, custom_hash_default, - custom_serialize_default, custom_deserialize_default, - custom_compare_ext_default, custom_fixed_length_default}; + "build.dune.fsevents.fsevents_t", dune_fsevents_t_finalize, + custom_compare_default, custom_hash_default, + custom_serialize_default, custom_deserialize_default, + custom_compare_ext_default, custom_fixed_length_default}; -CAMLprim value dune_fsevents_runloop_current(value v_unit) { +CAMLprim value dune_fsevents_dispatch_queue_create(value v_unit) { CAMLparam1(v_unit); - dune_runloop *rl; - rl = caml_stat_alloc(sizeof(dune_runloop)); - rl->runloop = CFRunLoopGetCurrent(); - rl->v_exn = Val_unit; - caml_register_global_root(&rl->v_exn); - value v_runloop = caml_alloc_custom(&dune_fsevents_runloop_ops, - sizeof(dune_runloop *), 0, 1); - Runloop_val(v_runloop) = rl; - CAMLreturn(v_runloop); -} - -CAMLprim value dune_fsevents_runloop_run(value v_runloop) { - CAMLparam1(v_runloop); + dune_dispatch_queue *dq; + dq = caml_stat_alloc(sizeof(dune_dispatch_queue)); + pthread_mutex_init(&dq->dq_lock, NULL); + pthread_cond_init(&dq->dq_finished, NULL); + dq->dq = dispatch_queue_create("build.dune.fsevents", DISPATCH_QUEUE_SERIAL); + dq->v_exn = Val_unit; + caml_register_global_root(&dq->v_exn); + value v_dq = caml_alloc_custom(&dune_fsevents_dispatch_queue_ops, + sizeof(dune_dispatch_queue *), 0, 1); + Dispatch_queue_val(v_dq) = dq; + CAMLreturn(v_dq); +} + +CAMLprim value dune_fsevents_dispatch_queue_wait_until_stopped(value v_dq) { + CAMLparam1(v_dq); CAMLlocal1(v_exn); - dune_runloop *runloop = Runloop_val(v_runloop); + dune_dispatch_queue *dq = Dispatch_queue_val(v_dq); caml_release_runtime_system(); - CFRunLoopRun(); + pthread_mutex_lock(&dq->dq_lock); + pthread_cond_wait(&dq->dq_finished, &dq->dq_lock); + pthread_mutex_unlock(&dq->dq_lock); caml_acquire_runtime_system(); - caml_remove_global_root(&runloop->v_exn); - v_exn = runloop->v_exn; + caml_remove_global_root(&dq->v_exn); + v_exn = dq->v_exn; if (v_exn != Val_unit) caml_raise(v_exn); CAMLreturn(Val_unit); } +// The thread-local storage key `register_thread` is intended to ensure that +// every thread that runs the `fsevents` callback only calls +// `caml_c_thread_register` and `caml_c_thread_unregister` once. +// +// macOS often reuses the same background thread for a serial dispatch queue, +// so this reduces the number of times `caml_c_thread_register` is called. +static pthread_key_t register_thread; +static pthread_once_t register_thread_once = PTHREAD_ONCE_INIT; + +static void destroy_register_thread(__attribute__((unused)) void *value) { + caml_c_thread_unregister(); +} + +static void make_register_thread() { + pthread_key_create(®ister_thread, destroy_register_thread); +} + +static void set_register_thread() { + pthread_once(®ister_thread_once, make_register_thread); + if (pthread_getspecific(register_thread) == NULL) { + caml_c_thread_register(); + // Since the value doesn't matter, use a reference to `register_thread` + pthread_setspecific(register_thread, ®ister_thread); + } +} + static FSEventStreamEventFlags interesting_flags = kFSEventStreamEventFlagItemCreated | kFSEventStreamEventFlagItemRemoved | kFSEventStreamEventFlagItemRenamed | kFSEventStreamEventFlagItemModified | @@ -90,6 +135,7 @@ static void dune_fsevents_callback(const FSEventStreamRef streamRef, CFArrayRef eventPaths, const FSEventStreamEventFlags eventFlags[], const FSEventStreamEventId eventIds[]) { + set_register_thread(); caml_acquire_runtime_system(); CAMLparam0(); CAMLlocal5(v_events_xs, v_events_x, v_flags, v_id, v_event); @@ -137,8 +183,10 @@ static void dune_fsevents_callback(const FSEventStreamRef streamRef, } v_res = caml_callback_exn(t->v_callback, v_events_xs); if (Is_exception_result(v_res)) { - t->runloop->v_exn = Extract_exception(v_res); - CFRunLoopStop(t->runloop->runloop); + pthread_mutex_lock(&t->dq->dq_lock); + t->dq->v_exn = Extract_exception(v_res); + pthread_cond_broadcast(&t->dq->dq_finished); + pthread_mutex_unlock(&t->dq->dq_lock); } CAMLdrop; caml_release_runtime_system(); @@ -212,13 +260,12 @@ CAMLprim value dune_fsevents_set_exclusion_paths(value v_t, value v_paths) { CAMLreturn(Val_unit); } -CAMLprim value dune_fsevents_start(value v_t, value v_runloop) { - CAMLparam2(v_t, v_runloop); +CAMLprim value dune_fsevents_start(value v_t, value v_dq) { + CAMLparam2(v_t, v_dq); dune_fsevents_t *t = Fsevents_val(v_t); - dune_runloop *runloop = Runloop_val(v_runloop); - t->runloop = runloop; - FSEventStreamScheduleWithRunLoop(t->stream, runloop->runloop, - kCFRunLoopDefaultMode); + dune_dispatch_queue *dq = Dispatch_queue_val(v_dq); + t->dq = dq; + FSEventStreamSetDispatchQueue(t->stream, dq->dq); bool res = FSEventStreamStart(t->stream); if (!res) { /* the docs say this is impossible anyway */ @@ -233,19 +280,23 @@ CAMLprim value dune_fsevents_stop(value v_t) { FSEventStreamStop(t->stream); FSEventStreamInvalidate(t->stream); FSEventStreamRelease(t->stream); + pthread_mutex_lock(&t->dq->dq_lock); + pthread_cond_broadcast(&t->dq->dq_finished); + pthread_mutex_unlock(&t->dq->dq_lock); + t->dq = NULL; CAMLreturn(Val_unit); } -CAMLprim value dune_fsevents_runloop_get(value v_t) { +CAMLprim value dune_fsevents_dispatch_queue_get(value v_t) { CAMLparam1(v_t); - CAMLlocal2(v_some, v_runloop); + CAMLlocal2(v_some, v_dq); dune_fsevents_t *t = Fsevents_val(v_t); - if (t->runloop == NULL) { + if (t->dq == NULL) { CAMLreturn(Val_int(0)); } else { - v_runloop = caml_copy_nativeint((intnat)t->runloop); + v_dq = caml_copy_nativeint((intnat)t->dq); v_some = caml_alloc_small(1, 0); - Store_field(v_some, 0, v_runloop); + Store_field(v_some, 0, v_dq); CAMLreturn(v_some); } } @@ -401,26 +452,11 @@ CAMLprim value dune_fsevents_flush_sync(value v_t) { caml_failwith(unavailable_message); } -CAMLprim value dune_fsevents_destroy(value v_t) { - (void)v_t; - caml_failwith(unavailable_message); -} - -CAMLprim value dune_fsevents_break(value v_t) { - (void)v_t; - caml_failwith(unavailable_message); -} - -CAMLprim value dune_fsevents_loop(value v_t) { - (void)v_t; - caml_failwith(unavailable_message); -} - -CAMLprim value dune_fsevents_runloop_current(value v_unit) { +CAMLprim value dune_fsevents_dispatch_queue_create(value v_unit) { (void)v_unit; caml_failwith(unavailable_message); } -CAMLprim value dune_fsevents_runloop_run(value v_unit) { +CAMLprim value dune_fsevents_dispatch_queue_wait_until_stopped(value v_unit) { (void)v_unit; caml_failwith(unavailable_message); } diff --git a/test/expect-tests/fsevents/fsevents_tests.ml b/test/expect-tests/fsevents/fsevents_tests.ml index bda3e030c6f..21bdddf3a1d 100644 --- a/test/expect-tests/fsevents/fsevents_tests.ml +++ b/test/expect-tests/fsevents/fsevents_tests.ml @@ -183,8 +183,8 @@ let test_with_multiple_fsevents ~setup ~test:f = res, sync) |> List.unzip in - let runloop = Fsevents.RunLoop.in_current_thread () in - List.iter fsevents ~f:(fun f -> Fsevents.start f runloop); + let dispatch_queue = Fsevents.Dispatch_queue.create () in + List.iter fsevents ~f:(fun f -> Fsevents.start f dispatch_queue); let (t : Thread.t) = Thread.create (fun () -> @@ -206,7 +206,7 @@ let test_with_multiple_fsevents ~setup ~test:f = syncs) () in - (match Fsevents.RunLoop.run_current_thread runloop with + (match Fsevents.Dispatch_queue.wait_until_stopped dispatch_queue with | Error Exit -> print_endline "[EXIT]" | Error _ -> assert false | Ok () -> ());