From 6894f625b0fcd17bc7d2db9444ce166d2806c338 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Thu, 24 Oct 2024 10:26:17 +0200 Subject: [PATCH] runtime: map every goroutine to a new OS thread This is not a scheduler in the runtime, instead every goroutine is mapped to a single OS thread - meaning 1:1 scheduling. While this may not perform well (or at all) for large numbers of threads, it greatly simplifies many things in the runtime. For example, blocking syscalls can be called directly instead of having to use epoll or similar. Also, we don't need to do anything special to call C code - the default stack is all we need. --- compileopts/options.go | 2 +- compileopts/options_test.go | 2 +- compileopts/target.go | 5 +- src/internal/task/linux.go | 9 + src/internal/task/semaphore.go | 32 ++++ src/internal/task/task_threads.c | 104 ++++++++++++ src/internal/task/task_threads.go | 265 ++++++++++++++++++++++++++++++ src/runtime/gc_stack_raw.go | 2 +- src/runtime/gc_stack_threads.go | 25 +++ src/runtime/runtime_unix.go | 1 + src/runtime/scheduler_threads.go | 124 ++++++++++++++ 11 files changed, 567 insertions(+), 4 deletions(-) create mode 100644 src/internal/task/linux.go create mode 100644 src/internal/task/semaphore.go create mode 100644 src/internal/task/task_threads.c create mode 100644 src/internal/task/task_threads.go create mode 100644 src/runtime/gc_stack_threads.go create mode 100644 src/runtime/scheduler_threads.go diff --git a/compileopts/options.go b/compileopts/options.go index bc462b29bd..52f0a64bff 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -10,7 +10,7 @@ import ( var ( validBuildModeOptions = []string{"default", "c-shared"} validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"} - validSchedulerOptions = []string{"none", "tasks", "asyncify"} + validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"} validSerialOptions = []string{"none", "uart", "usb", "rtt"} validPrintSizeOptions = []string{"none", "short", "full"} validPanicStrategyOptions = []string{"print", "trap"} diff --git a/compileopts/options_test.go b/compileopts/options_test.go index 23ffec465f..bd6e4c04ea 100644 --- a/compileopts/options_test.go +++ b/compileopts/options_test.go @@ -10,7 +10,7 @@ import ( func TestVerifyOptions(t *testing.T) { expectedGCError := errors.New(`invalid gc option 'incorrect': valid values are none, leaking, conservative, custom, precise`) - expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify`) + expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify, threads`) expectedPrintSizeError := errors.New(`invalid size option 'incorrect': valid values are none, short, full`) expectedPanicStrategyError := errors.New(`invalid panic option 'incorrect': valid values are print, trap`) diff --git a/compileopts/target.go b/compileopts/target.go index f60ee30972..1e6acdf869 100644 --- a/compileopts/target.go +++ b/compileopts/target.go @@ -247,7 +247,6 @@ func defaultTarget(options *Options) (*TargetSpec, error) { GOARCH: options.GOARCH, BuildTags: []string{options.GOOS, options.GOARCH}, GC: "precise", - Scheduler: "tasks", Linker: "cc", DefaultStackSize: 1024 * 64, // 64kB GDB: []string{"gdb"}, @@ -378,6 +377,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { platformVersion = "11.0.0" // first macosx platform with arm64 support } llvmvendor = "apple" + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "darwin-libSystem" // Use macosx* instead of darwin, otherwise darwin/arm64 will refer to @@ -395,6 +395,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "linux": + spec.Scheduler = "threads" spec.Linker = "ld.lld" spec.RTLib = "compiler-rt" spec.Libc = "musl" @@ -415,9 +416,11 @@ func defaultTarget(options *Options) (*TargetSpec, error) { } spec.ExtraFiles = append(spec.ExtraFiles, "src/internal/futex/futex_linux.c", + "src/internal/task/task_threads.c", "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "windows": + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "mingw-w64" // Note: using a medium code model, low image base and no ASLR diff --git a/src/internal/task/linux.go b/src/internal/task/linux.go new file mode 100644 index 0000000000..7d28f708c4 --- /dev/null +++ b/src/internal/task/linux.go @@ -0,0 +1,9 @@ +//go:build linux && !baremetal + +package task + +import "unsafe" + +// Musl uses a pointer (or unsigned long for C++) so unsafe.Pointer should be +// fine. +type threadID unsafe.Pointer diff --git a/src/internal/task/semaphore.go b/src/internal/task/semaphore.go new file mode 100644 index 0000000000..914f09bc5e --- /dev/null +++ b/src/internal/task/semaphore.go @@ -0,0 +1,32 @@ +package task + +// Barebones semaphore implementation. +// The main limitation is that if there are multiple waiters, a single Post() +// call won't do anything. Only when Post() has been called to awaken all +// waiters will the waiters proceed. +// This limitation is not a problem when there will only be a single waiter. +type Semaphore struct { + futex Futex +} + +// Post (unlock) the semaphore, incrementing the value in the semaphore. +func (s *Semaphore) Post() { + newValue := s.futex.Add(1) + if newValue == 0 { + s.futex.WakeAll() + } +} + +// Wait (lock) the semaphore, decrementing the value in the semaphore. +func (s *Semaphore) Wait() { + delta := int32(-1) + value := s.futex.Add(uint32(delta)) + for { + if int32(value) >= 0 { + // Semaphore unlocked! + return + } + s.futex.Wait(value) + value = s.futex.Load() + } +} diff --git a/src/internal/task/task_threads.c b/src/internal/task/task_threads.c new file mode 100644 index 0000000000..5f93980a2e --- /dev/null +++ b/src/internal/task/task_threads.c @@ -0,0 +1,104 @@ +//go:build none + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +// BDWGC also uses SIGRTMIN+6 on Linux, which seems like a reasonable choice. +#ifdef __linux__ +#define taskPauseSignal (SIGRTMIN + 6) +#endif + +// Pointer to the current task.Task structure. +// Ideally the entire task.Task structure would be a thread-local variable but +// this also works. +static __thread void *current_task; + +struct state_pass { + void *(*start)(void*); + void *args; + void *task; + uintptr_t *stackTop; + sem_t startlock; +}; + +// Handle the GC pause in Go. +void tinygo_task_gc_pause(int sig); + +// Initialize the main thread. +void tinygo_task_init(void *mainTask, pthread_t *thread, void *context) { + // Make sure the current task pointer is set correctly for the main + // goroutine as well. + current_task = mainTask; + + // Store the thread ID of the main thread. + *thread = pthread_self(); + + // Register the "GC pause" signal for the entire process. + // Using pthread_kill, we can still send the signal to a specific thread. + struct sigaction act = { 0 }; + act.sa_flags = SA_SIGINFO; + act.sa_handler = &tinygo_task_gc_pause; + sigaction(taskPauseSignal, &act, NULL); +} + +void tinygo_task_exited(void*); + +// Helper to start a goroutine while also storing the 'task' structure. +static void* start_wrapper(void *arg) { + struct state_pass *state = arg; + void *(*start)(void*) = state->start; + void *args = state->args; + current_task = state->task; + + // Save the current stack pointer in the goroutine state, for the GC. + int stackAddr; + *(state->stackTop) = (uintptr_t)(&stackAddr); + + // Notify the caller that the thread has successfully started and + // initialized. + sem_post(&state->startlock); + + // Run the goroutine function. + start(args); + + // Notify the Go side this thread will exit. + tinygo_task_exited(current_task); + + return NULL; +}; + +// Start a new goroutine in an OS thread. +int tinygo_task_start(uintptr_t fn, void *args, void *task, pthread_t *thread, uintptr_t *stackTop, void *context) { + // Sanity check. Should get optimized away. + if (sizeof(pthread_t) != sizeof(void*)) { + __builtin_trap(); + } + + struct state_pass state = { + .start = (void*)fn, + .args = args, + .task = task, + .stackTop = stackTop, + }; + sem_init(&state.startlock, 0, 0); + int result = pthread_create(thread, NULL, &start_wrapper, &state); + + // Wait until the thread has been crated and read all state_pass variables. + sem_wait(&state.startlock); + + return result; +} + +// Return the current task (for task.Current()). +void* tinygo_task_current(void) { + return current_task; +} + +// Send a signal to cause the task to pause for the GC mark phase. +void tinygo_task_send_gc_signal(pthread_t thread) { + pthread_kill(thread, taskPauseSignal); +} diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go new file mode 100644 index 0000000000..93204fb9ba --- /dev/null +++ b/src/internal/task/task_threads.go @@ -0,0 +1,265 @@ +//go:build scheduler.threads + +package task + +import ( + "sync/atomic" + "unsafe" +) + +// If true, print verbose debug logs. +const verbose = false + +// Scheduler-specific state. +type state struct { + // Goroutine ID. The number here is not really significant and after a while + // it could wrap around. But it is useful for debugging. + id uintptr + + // Thread ID, pthread_t or similar (typically implemented as a pointer). + thread threadID + + // Highest address of the stack. It is stored when the goroutine starts, and + // is needed to be able to scan the stack. + stackTop uintptr + + // Next task in the activeTasks queue. + QueueNext *Task + + // Semaphore to pause/resume the thread atomically. + pauseSem Semaphore + + // Semaphore used for stack scanning. + // We can't reuse pauseSem here since the thread might have been paused for + // other reasons (for example, because it was waiting on a channel). + gcSem Semaphore +} + +// Goroutine counter, starting at 0 for the main goroutine. +var goroutineID uintptr + +var mainTask Task + +// Queue of tasks (see QueueNext) that currently exist in the program. +var activeTasks = &mainTask +var activeTaskLock PMutex + +func OnSystemStack() bool { + runtimePanic("todo: task.OnSystemStack") + return false +} + +// Initialize the main goroutine state. Must be called by the runtime on +// startup, before starting any other goroutines. +func Init(sp uintptr) { + mainTask.state.stackTop = sp + tinygo_task_init(&mainTask, &mainTask.state.thread) +} + +// Return the task struct for the current thread. +func Current() *Task { + t := (*Task)(tinygo_task_current()) + if t == nil { + runtimePanic("unknown current task") + } + return t +} + +// Pause pauses the current task, until it is resumed by another task. +// It is possible that another task has called Resume() on the task before it +// hits Pause(), in which case the task won't be paused but continues +// immediately. +func Pause() { + // Wait until resumed + t := Current() + if verbose { + println("*** pause: ", t.state.id) + } + t.state.pauseSem.Wait() +} + +// Resume the given task. +// It is legal to resume a task before it gets paused, it means that the next +// call to Pause() won't pause but will continue immediately. This happens in +// practice sometimes in channel operations, where the Resume() might get called +// between the channel unlock and the call to Pause(). +func (t *Task) Resume() { + if verbose { + println("*** resume: ", t.state.id) + } + // Increment the semaphore counter. + // If the task is currently paused in Wait(), it will resume. + // If the task is not yet paused, the next call to Wait() will continue + // immediately. + t.state.pauseSem.Post() +} + +// Start a new OS thread. +func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { + t := &Task{} + t.state.id = atomic.AddUintptr(&goroutineID, 1) + if verbose { + println("*** start: ", t.state.id, "from", Current().state.id) + } + + // Start the new thread, and add it to the list of threads. + // Do this with a lock so that only started threads are part of the queue + // and the stop-the-world GC won't see threads that haven't started yet or + // are not fully started yet. + activeTaskLock.Lock() + errCode := tinygo_task_start(fn, args, t, &t.state.thread, &t.state.stackTop) + if errCode != 0 { + runtimePanic("could not start thread") + } + t.state.QueueNext = activeTasks + activeTasks = t + activeTaskLock.Unlock() +} + +//export tinygo_task_exited +func taskExited(t *Task) { + if verbose { + println("*** exit:", t.state.id) + } + + // Remove from the queue. + // TODO: this can be made more efficient by using a doubly linked list. + activeTaskLock.Lock() + found := false + for q := &activeTasks; *q != nil; q = &(*q).state.QueueNext { + if *q == t { + *q = t.state.QueueNext + found = true + break + } + } + activeTaskLock.Unlock() + + // Sanity check. + if !found { + runtimePanic("taskExited failed") + } +} + +// Futex to wait on until all tasks have finished scanning the stack. +// This is basically a sync.WaitGroup. +var scanDoneFutex Futex + +// GC scan phase. Because we need to stop the world while scanning, this kinda +// needs to be done in the tasks package. +func GCScan() { + current := Current() + + // Don't allow new goroutines to be started while pausing/resuming threads + // in the stop-the-world phase. + activeTaskLock.Lock() + + // Pause all other threads. + numOtherThreads := uint32(0) + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + numOtherThreads++ + tinygo_task_send_gc_signal(t.state.thread) + } + } + + // Store the number of threads to wait for in the futex. + // This is the equivalent of doing an initial wg.Add(numOtherThreads). + scanDoneFutex.Store(numOtherThreads) + + // Scan the current stack, and all current registers. + scanCurrentStack() + + // Wake each paused thread for the first time so it will scan the stack. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Wait until all threads have finished scanning their stack. + // This is the equivalent of wg.Wait() + for { + val := scanDoneFutex.Load() + if val == 0 { + break + } + scanDoneFutex.Wait(val) + } + + // Scan all globals (implemented in the runtime). + gcScanGlobals() + + // Wake each paused thread for the second time, so they will resume normal + // operation. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Allow goroutines to start and exit again. + activeTaskLock.Unlock() +} + +// Scan globals, implemented in the runtime package. +func gcScanGlobals() + +var stackScanLock PMutex + +//export tinygo_task_gc_pause +func tingyo_task_gc_pause() { + // Wait until we get the signal to start scanning the stack. + Current().state.gcSem.Wait() + + // Scan the thread stack. + // Only scan a single thread stack at a time, because the GC marking phase + // doesn't support parallelism. + // TODO: it may be possible to call markRoots directly (without saving + // registers) since we are in a signal handler that already saved a bunch of + // registers. This is an optimization left for a future time. + stackScanLock.Lock() + scanCurrentStack() + stackScanLock.Unlock() + + // Equivalent of wg.Done(): subtract one from the futex and if the result is + // 0 (meaning we were the last in the waitgroup), wake the waiting thread. + n := uint32(1) + if scanDoneFutex.Add(-n) == 0 { + scanDoneFutex.Wake() + } + + // Wait until we get the signal we can resume normally (after the mark phase + // has finished). + Current().state.gcSem.Wait() +} + +//go:export tinygo_scanCurrentStack +func scanCurrentStack() + +// Return the highest address of the current stack. +func StackTop() uintptr { + return Current().state.stackTop +} + +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) + +// Using //go:linkname instead of //export so that we don't tell the compiler +// that the 't' parameter won't escape (because it will). +// +//go:linkname tinygo_task_init tinygo_task_init +func tinygo_task_init(t *Task, thread *threadID) + +// Here same as for tinygo_task_init. +// +//go:linkname tinygo_task_start tinygo_task_start +func tinygo_task_start(fn uintptr, args unsafe.Pointer, t *Task, thread *threadID, stackTop *uintptr) int32 + +// Pause the thread by sending it a signal. +// +//export tinygo_task_send_gc_signal +func tinygo_task_send_gc_signal(threadID) + +//export tinygo_task_current +func tinygo_task_current() unsafe.Pointer diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index d55522a9f6..bdc3154fa5 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -1,4 +1,4 @@ -//go:build (gc.conservative || gc.precise) && !tinygo.wasm +//go:build (gc.conservative || gc.precise) && !tinygo.wasm && !scheduler.threads package runtime diff --git a/src/runtime/gc_stack_threads.go b/src/runtime/gc_stack_threads.go new file mode 100644 index 0000000000..9c77fa0c7b --- /dev/null +++ b/src/runtime/gc_stack_threads.go @@ -0,0 +1,25 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +func gcMarkReachable() { + task.GCScan() +} + +// Scan globals inside the stop-the-world phase. Called from the STW +// implementation in the internal/task package. +// +//go:linkname gcScanGlobals internal/task.gcScanGlobals +func gcScanGlobals() { + findGlobals(markRoots) +} + +// Function called from assembly with all registers pushed, to actually scan the +// stack. +// +//go:export tinygo_scanstack +func scanstack(sp uintptr) { + markRoots(sp, task.StackTop()) +} diff --git a/src/runtime/runtime_unix.go b/src/runtime/runtime_unix.go index 08e3e74269..17e004b2e8 100644 --- a/src/runtime/runtime_unix.go +++ b/src/runtime/runtime_unix.go @@ -73,6 +73,7 @@ type timespec struct { tv_nsec int64 // unsigned 64-bit integer on all time64 platforms } +// Highest address of the stack of the main thread. var stackTop uintptr // Entry point for Go. Initialize all packages and call main.main(). diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go new file mode 100644 index 0000000000..e553a5b9c7 --- /dev/null +++ b/src/runtime/scheduler_threads.go @@ -0,0 +1,124 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +const hasScheduler = false // not using the cooperative scheduler + +// We use threads, so yes there is parallelism. +const hasParallelism = true + +var ( + timerQueueLock task.PMutex + timerQueueStarted bool + timerFutex task.Futex +) + +// Because we just use OS threads, we don't need to do anything special here. We +// can just initialize everything and run main.main on the main thread. +func run() { + initHeap() + task.Init(stackTop) + initAll() + callMain() +} + +// Pause the current task for a given time. +// +//go:linkname sleep time.Sleep +func sleep(duration int64) { + if duration <= 0 { + return + } + + sleepTicks(nanosecondsToTicks(duration)) +} + +func deadlock() { + // TODO: exit the thread via pthread_exit. + task.Pause() +} + +func scheduleTask(t *task.Task) { + t.Resume() +} + +func Gosched() { + // Each goroutine runs in a thread, so there's not much we can do here. + // There is sched_yield but it's only really intended for realtime + // operation, so is probably best not to use. +} + +// Separate goroutine (thread) that runs timer callbacks when they expire. +func timerRunner() { + for { + timerQueueLock.Lock() + + if timerQueue == nil { + // No timer in the queue, so wait until one becomes available. + val := timerFutex.Load() + timerQueueLock.Unlock() + timerFutex.Wait(val) + continue + } + + now := ticks() + if now < timerQueue.whenTicks() { + // There is a timer in the queue, but we need to wait until it + // expires. + // Using a futex, so that the wait is exited early when adding a new + // (sooner-to-expire) timer. + val := timerFutex.Load() + timerQueueLock.Unlock() + timeout := ticksToNanoseconds(timerQueue.whenTicks() - now) + timerFutex.WaitUntil(val, uint64(timeout)) + continue + } + + // Pop timer from queue. + tn := timerQueue + timerQueue = tn.next + tn.next = nil + + timerQueueLock.Unlock() + + // Run the callback stored in this timer node. + delay := ticksToNanoseconds(now - tn.whenTicks()) + tn.callback(tn, delay) + } +} + +func addTimer(tim *timerNode) { + timerQueueLock.Lock() + + if !timerQueueStarted { + timerQueueStarted = true + go timerRunner() + } + + timerQueueAdd(tim) + + timerFutex.Add(1) + timerFutex.Wake() + + timerQueueLock.Unlock() +} + +func removeTimer(tim *timer) bool { + timerQueueLock.Lock() + removed := timerQueueRemove(tim) + timerQueueLock.Unlock() + return removed +} + +func schedulerRunQueue() *task.Queue { + // This function is not actually used, it is only called when hasScheduler + // is true. So we can just return nil here. + return nil +} + +func runqueueForGC() *task.Queue { + // There is only a runqueue when using the cooperative scheduler. + return nil +}