Skip to content

Commit

Permalink
flambda-backend: Introduce an API to swap the runtime lock for a diff…
Browse files Browse the repository at this point in the history
…erent lock. (#1365)

See caml_switch_runtime_locking_scheme in threads.h
  • Loading branch information
stedolan authored May 24, 2023
1 parent 1ce68db commit 674a335
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Makefile.common-jst
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ install_for_test: _install
done; \
ln -s _install/lib/ocaml stdlib; \
mkdir runtime; \
for f in ocamlrun* stdlib/caml stdlib/stublibs/*; do \
for f in ocamlrun* stdlib/caml stdlib/stublibs/* runtime/caml/threads.h; do \
ln -s ../$$f runtime/`basename $$f`; \
done; \
ln -s . lex; ln -s . yacc; \
Expand Down
127 changes: 106 additions & 21 deletions otherlibs/systhreads/st_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
#include "caml/sys.h"
#include "caml/memprof.h"

/* threads.h is *not* included since it contains the _external_ declarations for
the caml_c_thread_register and caml_c_thread_unregister functions. */
#define CAMLextern_libthreads
#include "threads.h"

#ifndef NATIVE_CODE
/* Initial size of bytecode stack when a thread is created (4 Ko) */
Expand All @@ -55,6 +55,23 @@
#include "st_posix.h"
#endif

/* Atomics */
#if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ == 8
/* GCC 4.8 shipped with a working implementation of atomics, but no
stdatomic.h header, so we need to use GCC-specific intrinsics. */

#define _Atomic /* GCC intrinsics work on normal variables */
#define atomic_store(v, x) \
__atomic_store_n((v), (x), __ATOMIC_SEQ_CST)
#define atomic_load(v) \
__atomic_load_n((v), __ATOMIC_SEQ_CST)
#define atomic_exchange(v, x) \
__atomic_exchange_n((v), (x), __ATOMIC_SEQ_CST)
#else
#include <stdatomic.h>
#endif


/* The ML value describing a thread (heap-allocated) */

struct caml_thread_descr {
Expand Down Expand Up @@ -111,7 +128,7 @@ static caml_thread_t all_threads = NULL;
static caml_thread_t curr_thread = NULL;

/* The master lock protecting the OCaml runtime system */
static st_masterlock caml_master_lock;
static struct caml_locking_scheme* _Atomic caml_locking_scheme;

/* Whether the "tick" thread is already running */
static int caml_tick_thread_running = 0;
Expand Down Expand Up @@ -143,6 +160,50 @@ extern struct longjmp_buffer caml_termination_jmpbuf;
extern void (*caml_termination_hook)(void);
#endif

/* The default locking scheme */
static st_masterlock default_master_lock;

static int default_can_skip_yield(void* m)
{
return st_masterlock_waiters(m) == 0;
}

struct caml_locking_scheme caml_default_locking_scheme =
{ &default_master_lock,
(void (*)(void*))&st_masterlock_acquire,
(void (*)(void*))&st_masterlock_release,
(void (*)(void*))&st_masterlock_init,
default_can_skip_yield,
(void (*)(void*))&st_thread_yield };

static void acquire_runtime_lock()
{
struct caml_locking_scheme* s;

/* The locking scheme may be changed by the thread that currently
holds it. This means that it may change while we're waiting to
acquire it, so by the time we acquire it it may no longer be the
right scheme. */

retry:
s = atomic_load(&caml_locking_scheme);
s->lock(s->context);
if (atomic_load(&caml_locking_scheme) != s) {
/* This is no longer the right scheme. Unlock and try again */
s->unlock(s->context);
goto retry;
}
}

static void release_runtime_lock()
{
/* There is no tricky case here like in acquire, as only the holder
of the lock can change it. (Here, that's us) */
struct caml_locking_scheme* s;
s = atomic_load(&caml_locking_scheme);
s->unlock(s->context);
}

/* Hook for scanning the stacks of the other threads */

static void (*prev_scan_roots_hook) (scanning_action);
Expand Down Expand Up @@ -182,7 +243,7 @@ static void memprof_ctx_iter(th_ctx_action f, void* data)

/* Saving and restoring runtime state in curr_thread */

Caml_inline void caml_thread_save_runtime_state(void)
CAMLexport void caml_thread_save_runtime_state(void)
{
#ifdef NATIVE_CODE
curr_thread->top_of_stack = Caml_state->top_of_stack;
Expand All @@ -208,8 +269,12 @@ Caml_inline void caml_thread_save_runtime_state(void)
caml_memprof_leave_thread();
}

Caml_inline void caml_thread_restore_runtime_state(void)
CAMLexport void caml_thread_restore_runtime_state(void)
{
/* Update curr_thread to point to the thread descriptor corresponding
to the thread currently executing */
curr_thread = st_tls_get(thread_descriptor_key);

#ifdef NATIVE_CODE
Caml_state->top_of_stack = curr_thread->top_of_stack;
Caml_state->bottom_of_stack= curr_thread->bottom_of_stack;
Expand All @@ -234,6 +299,19 @@ Caml_inline void caml_thread_restore_runtime_state(void)
caml_memprof_enter_thread(curr_thread->memprof_ctx);
}

CAMLexport void caml_switch_runtime_locking_scheme(struct caml_locking_scheme* new)
{
struct caml_locking_scheme* old;

caml_thread_save_runtime_state();
old = atomic_exchange(&caml_locking_scheme, new);
/* We hold 'old', but it is no longer the runtime lock */
old->unlock(old->context);
acquire_runtime_lock();
caml_thread_restore_runtime_state();
}


/* Hooks for caml_enter_blocking_section and caml_leave_blocking_section */


Expand All @@ -243,7 +321,7 @@ static void caml_thread_enter_blocking_section(void)
of the current thread */
caml_thread_save_runtime_state();
/* Tell other threads that the runtime is free */
st_masterlock_release(&caml_master_lock);
release_runtime_lock();
}

static void caml_thread_leave_blocking_section(void)
Expand All @@ -255,11 +333,7 @@ static void caml_thread_leave_blocking_section(void)
DWORD error = GetLastError();
#endif
/* Wait until the runtime is free */
st_masterlock_acquire(&caml_master_lock);
/* Update curr_thread to point to the thread descriptor corresponding
to the thread currently executing */
curr_thread = st_tls_get(thread_descriptor_key);
/* Restore the runtime state from the curr_thread descriptor */
acquire_runtime_lock();
caml_thread_restore_runtime_state();
#ifdef _WIN32
SetLastError(error);
Expand Down Expand Up @@ -419,6 +493,7 @@ static void caml_thread_remove_info(caml_thread_t th)
static void caml_thread_reinitialize(void)
{
struct channel * chan;
struct caml_locking_scheme* s;

/* Remove all other threads (now nonexistent)
from the doubly-linked list of threads */
Expand All @@ -430,7 +505,8 @@ static void caml_thread_reinitialize(void)
/* Reinitialize the master lock machinery,
just in case the fork happened while other threads were doing
caml_leave_blocking_section */
st_masterlock_init(&caml_master_lock);
s = atomic_load(&caml_locking_scheme);
s->reinitialize_after_fork(s->context);
/* Tick thread is not currently running in child process, will be
re-created at next Thread.create */
caml_tick_thread_running = 0;
Expand All @@ -454,7 +530,8 @@ CAMLprim value caml_thread_initialize(value unit) /* ML */
/* OS-specific initialization */
st_initialize();
/* Initialize and acquire the master lock */
st_masterlock_init(&caml_master_lock);
st_masterlock_init(&default_master_lock);
caml_locking_scheme = &caml_default_locking_scheme;
/* Initialize the keys */
st_tls_newkey(&thread_descriptor_key);
st_tls_newkey(&last_channel_locked_key);
Expand Down Expand Up @@ -562,7 +639,7 @@ static void caml_thread_stop(void)
/* OS-specific cleanups */
st_thread_cleanup();
/* Release the runtime system */
st_masterlock_release(&caml_master_lock);
release_runtime_lock();
}

/* Create a thread */
Expand Down Expand Up @@ -658,7 +735,7 @@ CAMLexport int caml_c_thread_register(void)
th->top_of_stack = (char *) &err;
#endif
/* Take master lock to protect access to the chaining of threads */
st_masterlock_acquire(&caml_master_lock);
acquire_runtime_lock();
/* Add thread info block to the list of threads */
if (all_threads == NULL) {
th->next = th;
Expand All @@ -673,7 +750,7 @@ CAMLexport int caml_c_thread_register(void)
/* Associate the thread descriptor with the thread */
st_tls_set(thread_descriptor_key, (void *) th);
/* Release the master lock */
st_masterlock_release(&caml_master_lock);
release_runtime_lock();
/* Now we can re-enter the run-time system and heap-allocate the descriptor */
caml_leave_blocking_section();
th->descr = caml_thread_new_descriptor(Val_unit); /* no closure */
Expand All @@ -694,7 +771,7 @@ CAMLexport int caml_c_thread_unregister(void)
/* Not registered? */
if (th == NULL) return 0;
/* Wait until the runtime is available */
st_masterlock_acquire(&caml_master_lock);
acquire_runtime_lock();
/* Forget the thread descriptor */
st_tls_set(thread_descriptor_key, NULL);
/* Remove thread info block from list of threads, and free it */
Expand All @@ -703,7 +780,7 @@ CAMLexport int caml_c_thread_unregister(void)
so that it does not prevent the whole process from exiting (#9971) */
if (all_threads == NULL) caml_thread_cleanup(Val_unit);
/* Release the runtime */
st_masterlock_release(&caml_master_lock);
release_runtime_lock();
return 1;
}

Expand Down Expand Up @@ -771,7 +848,11 @@ CAMLprim value caml_thread_exit(value unit) /* ML */

CAMLprim value caml_thread_yield(value unit) /* ML */
{
if (st_masterlock_waiters(&caml_master_lock) == 0) return Val_unit;
struct caml_locking_scheme* s;

s = atomic_load(&caml_locking_scheme);
if (s->can_skip_yield != NULL && s->can_skip_yield(s->context))
return Val_unit;

/* Do all the parts of a blocking section enter/leave except lock
manipulation, which we'll do more efficiently in st_thread_yield. (Since
Expand All @@ -781,8 +862,12 @@ CAMLprim value caml_thread_yield(value unit) /* ML */
caml_raise_async_if_exception(caml_process_pending_signals_exn(),
"signal handler");
caml_thread_save_runtime_state();
st_thread_yield(&caml_master_lock);
curr_thread = st_tls_get(thread_descriptor_key);
s->yield(s->context);
if (atomic_load(&caml_locking_scheme) != s) {
/* The lock we have is no longer the runtime lock */
s->unlock(s->context);
acquire_runtime_lock();
}
caml_thread_restore_runtime_state();
caml_raise_async_if_exception(caml_process_pending_signals_exn(),
"signal handler");
Expand Down
43 changes: 41 additions & 2 deletions otherlibs/systhreads/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ CAMLextern void caml_leave_blocking_section (void);
use the runtime system (typically, a blocking I/O operation).
*/

CAMLextern int caml_c_thread_register(void);
CAMLextern int caml_c_thread_unregister(void);
/* These functions are defined in the threads library, not the runtime */
#ifndef CAMLextern_libthreads
#define CAMLextern_libthreads CAMLextern
#endif
CAMLextern_libthreads int caml_c_thread_register(void);
CAMLextern_libthreads int caml_c_thread_unregister(void);

/* If a thread is created by C code (instead of by OCaml itself),
it must be registered with the OCaml runtime system before
Expand All @@ -61,6 +65,41 @@ CAMLextern int caml_c_thread_unregister(void);
Both functions return 1 on success, 0 on error.
*/

struct caml_locking_scheme {
void* context;
void (*lock)(void*);
void (*unlock)(void*);

/* Called after fork().
The lock should be held after this function returns. */
void (*reinitialize_after_fork)(void*);

/* can_skip_yield and yield are both called with the lock held,
and expect it held on return */
int (*can_skip_yield)(void*);
void (*yield)(void*);
};

extern struct caml_locking_scheme caml_default_locking_scheme;

/* Switch to a new runtime locking scheme.
The old runtime lock must be held (i.e. not in a blocking section),
and the new runtime lock must not be held. After this function
returns, the old lock is released and the new one is held.
There is a period during this function when neither lock is held,
so context-switches may occur. */
CAMLextern_libthreads
void caml_switch_runtime_locking_scheme(struct caml_locking_scheme*);

CAMLextern_libthreads
void caml_thread_save_runtime_state(void);

CAMLextern_libthreads
void caml_thread_restore_runtime_state(void);


#ifdef __cplusplus
}
#endif
Expand Down
42 changes: 42 additions & 0 deletions testsuite/tests/lib-systhreads/swapgil.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
(* TEST
modules = "swapgil_stubs.c"
* hassysthreads
include systhreads
** hasunix
*** native
*)

let counter = ref 0

external blocking_section : unit -> unit = "blocking_section"

type c_thread
external create_c_thread : (unit -> unit) -> c_thread = "create_c_thread"
external join_c_thread : c_thread -> unit = "join_c_thread"

external swap_gil : unit -> unit = "swap_gil"

let threadfn () =
for i = 1 to 1_000 do
incr counter;
let junk = Sys.opaque_identity (ref !counter) in
ignore junk;
match i mod 100, i mod 10 with
| _, 0 -> Thread.yield ()
| _, 1 -> blocking_section ()
| 22, _ -> Gc.minor ()
| _, 3 -> swap_gil ()
| _ -> ()
done

let () =
let open Either in
let threads =
List.init 40 (fun i ->
if i land 1 = 0 then
Left (Thread.create threadfn ())
else
Right (create_c_thread threadfn))
in
List.iter (function Left th -> Thread.join th | Right ct -> join_c_thread ct) threads;
Printf.printf "%d\n" !counter
1 change: 1 addition & 0 deletions testsuite/tests/lib-systhreads/swapgil.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
40000
Loading

0 comments on commit 674a335

Please sign in to comment.