diff --git a/src/runtime/encore/encore.c b/src/runtime/encore/encore.c index 8d4a4034c..074841b2b 100644 --- a/src/runtime/encore/encore.c +++ b/src/runtime/encore/encore.c @@ -5,15 +5,9 @@ #include #include #include +#include -extern void public_run(); - -bool has_flag(pony_actor_t* actor, uint8_t flag); - -typedef struct context { - ucontext_t ctx; - struct context *next; -} context; +extern void public_run(pthread_mutex_t *lock); enum { @@ -25,13 +19,24 @@ enum #define MAX_IN_POOL 4 -static __pony_thread_local stack_page *stack_pool = NULL; -static __pony_thread_local unsigned int available_pages = 0; -static __pony_thread_local stack_page *local_page = NULL; +inline static void assert_swap(ucontext_t *old, ucontext_t *new) +{ + int ret = swapcontext(old, new); + assert(ret == 0); +} static __pony_thread_local context *context_pool = NULL; static __pony_thread_local unsigned int available_context = 0; -static __pony_thread_local context *this_context = NULL; + +__pony_thread_local context *this_context; +__pony_thread_local ucontext_t *origin; +__pony_thread_local ucontext_t *root; + +#ifndef LAZY_IMPL + +static __pony_thread_local stack_page *stack_pool = NULL; +static __pony_thread_local unsigned int available_pages = 0; +static __pony_thread_local stack_page *local_page = NULL; static stack_page *pop_page() { @@ -49,14 +54,6 @@ static stack_page *pop_page() return page; } -void *get_local_page_stack() -{ - local_page = local_page ? local_page : pop_page(); - assert(local_page); - assert(local_page->stack); - return local_page->stack; -} - static void push_page(stack_page *page) { available_pages++; @@ -64,81 +61,43 @@ static void push_page(stack_page *page) stack_pool = page; } -void reclaim_page(encore_actor_t *a) +static void reclaim_page(encore_actor_t *a) { push_page(a->page); a->page = NULL; } -static void clean_pool() +void *get_local_page_stack() { -#ifndef LAZY_IMPL - - static stack_page *page, *next; - while (available_pages > MAX_IN_POOL) { - available_pages--; - page = stack_pool; - next = stack_pool->next; - free(page->stack); - free(page); - stack_pool = next; - } - -#else - - static context *ctx, *next; - while (available_context > MAX_IN_POOL) { - available_context--; - ctx = context_pool; - next = context_pool->next; - free(ctx->ctx.uc_stack.ss_sp); - free(ctx); - context_pool = next; - } - -#endif + local_page = local_page ? local_page : pop_page(); + assert(local_page); + assert(local_page->stack); + return local_page->stack; } -void actor_suspend(encore_actor_t *actor) +void actor_set_run_to_completion(encore_actor_t *actor) { - // Make a copy of the current context and replace it - ucontext_t ctxp = actor->ctx; - - ctx_wrapper d = { .ctx = &ctxp, .uc_link=ctxp.uc_link }; - encore_arg_t argv[1] = { { .p = &d } }; - actor->page = local_page; - local_page = NULL; - - // TODO find out the right way of calling pond_sendv - // pony_sendv(actor, FUT_MSG_SUSPEND, 1, argv); - - actor->run_to_completion = false; - int ret = swapcontext(&ctxp, ctxp.uc_link); - assert(ret == 0); - assert(ctxp.uc_link == &actor->home_ctx); + actor->run_to_completion = true; } -// TODO: suspend and await overlaps heavily -void actor_await(encore_actor_t *actor, void *future) +bool actor_run_to_completion(encore_actor_t *actor) { - // Make a copy of the current context and replace it - ucontext_t ctxp = actor->ctx; - ctx_wrapper d = { .ctx = &ctxp, .uc_link=ctxp.uc_link }; - encore_arg_t argv[2] = { { .p = &d }, { .p = future } }; + return actor->run_to_completion; +} - actor->page = local_page; - local_page = NULL; +void actor_unlock(encore_actor_t *actor) +{ + if (actor->lock) { + pthread_mutex_unlock(actor->lock); + actor->lock = NULL; + } +} - // TODO find out the right way of calling pond_sendv - // pony_sendv(actor, FUT_MSG_AWAIT, 2, argv); +#endif - actor->run_to_completion = false; - int ret = swapcontext(&ctxp, ctxp.uc_link); - assert(ret == 0); - assert(ctxp.uc_link == &actor->home_ctx); -} +#ifdef LAZY_IMPL -static context *pop_context() +static context *pop_context(encore_actor_t *actor) { static context *c; if (available_context == 0) { @@ -149,10 +108,10 @@ static context *pop_context() assert(ret == 0); context_pool->ctx.uc_stack.ss_size = Stack_Size; context_pool->ctx.uc_stack.ss_flags = 0; - makecontext(&context_pool->ctx, (void(*)(void))public_run, 0); } else { available_context--; } + makecontext(&context_pool->ctx, (void(*)(void))public_run, 1, actor->lock); c = context_pool; context_pool = c->next; assert(c->ctx.uc_stack.ss_sp); @@ -166,6 +125,37 @@ static void push_context(context *ctx) context_pool = ctx; } +#endif + +static void clean_pool() +{ +#ifndef LAZY_IMPL + + static stack_page *page, *next; + while (available_pages > MAX_IN_POOL) { + available_pages--; + page = stack_pool; + next = stack_pool->next; + free(page->stack); + free(page); + stack_pool = next; + } + +#else + + static context *ctx, *next; + while (available_context > MAX_IN_POOL) { + available_context--; + ctx = context_pool; + next = context_pool->next; + free(ctx->ctx.uc_stack.ss_sp); + free(ctx); + context_pool = next; + } + +#endif +} + void actor_block(encore_actor_t *actor) { #ifndef LAZY_IMPL @@ -178,18 +168,18 @@ void actor_block(encore_actor_t *actor) assert(actor->page); assert(actor->page->stack); actor->run_to_completion = false; - int ret = swapcontext(&actor->ctx, actor->ctx.uc_link); - assert(ret == 0); + assert_swap(&actor->ctx, actor->ctx.uc_link); #else - ucontext_t ctx; - actor->saved = &ctx; - context *previous = this_context; - this_context = pop_context(); - int ret = swapcontext(&ctx, &this_context->ctx); - assert(ret == 0); - this_context = previous; + actor->saved = &this_context->ctx; + + context *old = this_context; + + this_context = pop_context(actor); + assert_swap(actor->saved, &this_context->ctx); + + this_context = old; #endif } @@ -199,24 +189,13 @@ void actor_set_resume(encore_actor_t *actor) actor->resume = true; } -void actor_set_run_to_completion(encore_actor_t *actor) -{ - actor->run_to_completion = true; -} - -bool actor_run_to_completion(encore_actor_t *actor) -{ - return actor->run_to_completion; -} - void actor_resume(encore_actor_t *actor) { #ifndef LAZY_IMPL actor->resume = false; actor->run_to_completion = true; - int ret = swapcontext(actor->ctx.uc_link, &actor->ctx); - assert(ret == 0); + assert_swap(actor->ctx.uc_link, &actor->ctx); if (actor->run_to_completion) { reclaim_page(actor); @@ -225,15 +204,58 @@ void actor_resume(encore_actor_t *actor) #else actor->resume = false; - if (this_context) { + if (&this_context->ctx != root) { push_context(this_context); } setcontext(actor->saved); - exit(1); + assert(0); #endif } +void actor_suspend(encore_actor_t *actor) +{ +#ifndef LAZY_IMPL + // Make a copy of the current context and replace it + ucontext_t ctxp = actor->ctx; + + ctx_wrapper d = { .ctx = &ctxp, .uc_link=ctxp.uc_link }; + encore_arg_t argv[1] = { { .p = &d } }; + actor->page = local_page; + local_page = NULL; + + // TODO find out the right way of calling pond_sendv + // pony_sendv(actor, FUT_MSG_SUSPEND, 1, argv); + + actor->run_to_completion = false; + int ret = swapcontext(&ctxp, ctxp.uc_link); + assert(ret == 0); + assert(ctxp.uc_link == &actor->home_ctx); +#endif +} + +// TODO: suspend and await overlaps heavily +void actor_await(encore_actor_t *actor, void *future) +{ +#ifndef LAZY_IMPL + // Make a copy of the current context and replace it + ucontext_t ctxp = actor->ctx; + ctx_wrapper d = { .ctx = &ctxp, .uc_link=ctxp.uc_link }; + encore_arg_t argv[2] = { { .p = &d }, { .p = future } }; + + actor->page = local_page; + local_page = NULL; + + // TODO find out the right way of calling pond_sendv + // pony_sendv(actor, FUT_MSG_AWAIT, 2, argv); + + actor->run_to_completion = false; + int ret = swapcontext(&ctxp, ctxp.uc_link); + assert(ret == 0); + assert(ctxp.uc_link == &actor->home_ctx); +#endif +} + encore_actor_t *encore_create(pony_type_t *type) { return (encore_actor_t *)pony_create(type); @@ -272,12 +294,10 @@ bool encore_actor_run_hook(encore_actor_t *actor) if(actor->resume) { actor_resume(actor); -#ifndef LAZY_IMPL - return !has_flag((pony_actor_t *)actor, FLAG_UNSCHEDULED); -#endif + return true; } - return true; + return false; } bool encore_actor_handle_message_hook(encore_actor_t *actor, pony_msg_t* msg) diff --git a/src/runtime/encore/encore.h b/src/runtime/encore/encore.h index d61c03457..be83ee116 100644 --- a/src/runtime/encore/encore.h +++ b/src/runtime/encore/encore.h @@ -6,6 +6,9 @@ #define LAZY_IMPL +// multithreading is not working on mac yet +#define SINGLE_THREAD_ON_MACOSX + #define Stack_Size 64*1024 typedef struct ctx_wrapper { ucontext_t* ctx; @@ -14,10 +17,19 @@ typedef struct ctx_wrapper { #include +typedef struct context { + ucontext_t ctx; + struct context *next; +} context; + +extern __pony_thread_local ucontext_t *root; +extern __pony_thread_local ucontext_t *origin; +extern __pony_thread_local context *this_context; + static pony_type_t *ENCORE_ACTIVE = (pony_type_t *)1; static pony_type_t *ENCORE_PRIMITIVE = (pony_type_t *)NULL; -typedef struct encore_actor encore_actor_t; +__pony_spec_align__(typedef struct encore_actor encore_actor_t, 64); typedef struct encore_oneway_msg encore_oneway_msg_t; typedef struct encore_fut_msg encore_fut_msg_t; @@ -66,12 +78,16 @@ struct encore_actor { pony_actor_pad_t; // Everything else that goes into an encore_actor that's not part of PonyRT + bool resume; + pthread_mutex_t *lock; +#ifndef LAZY_IMPL ucontext_t ctx; ucontext_t home_ctx; - bool resume; bool run_to_completion; stack_page *page; +#else ucontext_t *saved; +#endif }; /// Create a new Encore actor @@ -86,12 +102,15 @@ void *encore_alloc(size_t s); /// The starting point of all Encore programs int encore_start(int argc, char** argv, pony_type_t *type); +void actor_unlock(encore_actor_t *actor); bool encore_actor_run_hook(encore_actor_t *actor); bool encore_actor_handle_message_hook(encore_actor_t *actor, pony_msg_t* msg); void actor_block(encore_actor_t *actor); void actor_set_resume(encore_actor_t *actor); #ifdef LAZY_IMPL void actor_resume(encore_actor_t *actor) __attribute__ ((noreturn)); +#else +void actor_resume(encore_actor_t *actor); #endif /// calls the pony's respond with the current object's scheduler diff --git a/src/runtime/future/future.c b/src/runtime/future/future.c index 450ca3839..682009452 100644 --- a/src/runtime/future/future.c +++ b/src/runtime/future/future.c @@ -120,8 +120,6 @@ void future_trace(void* p) /// concurrent access should not be a problem. perr("future_trace"); - pony_trace(p); - future_t* fut = (future_t*)p; if (future_fulfilled(fut)) { @@ -301,10 +299,11 @@ void future_block_actor(future_t *fut) pony_unschedule(a); assert(fut->no_responsibilities < 16); fut->responsibilities[fut->no_responsibilities++] = (actor_entry_t) { .type = BLOCKED_MESSAGE, .message = (message_entry_t) { .actor = a } }; - UNBLOCK; - actor_block((encore_actor_t*)a); + encore_actor_t *actor = (encore_actor_t*) a; + actor->lock = &fut->lock; + actor_block(actor); } // =============================================================== diff --git a/src/runtime/pony/src/actor/actor.c b/src/runtime/pony/src/actor/actor.c index 80041c993..f5844fbd8 100644 --- a/src/runtime/pony/src/actor/actor.c +++ b/src/runtime/pony/src/actor/actor.c @@ -134,8 +134,8 @@ bool actor_run(pony_actor_t* actor) this_actor = actor; if (!has_flag(actor, FLAG_SYSTEM)) { - if (!encore_actor_run_hook((encore_actor_t *)actor)) { - return false; + if (encore_actor_run_hook((encore_actor_t *)actor)) { + return !has_flag((pony_actor_t *)actor, FLAG_UNSCHEDULED); } } @@ -256,16 +256,6 @@ void actor_setnext(pony_actor_t* actor, pony_actor_t* next) actor->next = next; } -pony_actor_t* actor_dormant_next(pony_actor_t* actor) -{ - return actor->dormant_next; -} - -void actor_set_dormant_next(pony_actor_t* actor, pony_actor_t* dormant_next) -{ - actor->dormant_next = dormant_next; -} - void actor_inc_rc() { this_actor->gc.rc++; diff --git a/src/runtime/pony/src/sched/scheduler.c b/src/runtime/pony/src/sched/scheduler.c index 324cc102e..c3e03fb8b 100644 --- a/src/runtime/pony/src/sched/scheduler.c +++ b/src/runtime/pony/src/sched/scheduler.c @@ -1,14 +1,16 @@ +#define _XOPEN_SOURCE 800 + #include "scheduler.h" #include "cpu.h" #include "mpmcq.h" #include "../actor/actor.h" #include "../gc/cycle.h" +#include "encore.h" #include #include +#include #include -// #define USE_DORMANT_LIST - typedef struct scheduler_t scheduler_t; __pony_spec_align__( @@ -22,9 +24,6 @@ __pony_spec_align__( pony_actor_t* head; pony_actor_t* tail; - pony_actor_t* dormant_head; - pony_actor_t* dormant_tail; - struct scheduler_t* victim; // the following are accessed by other scheduler threads @@ -38,6 +37,8 @@ static DECLARE_THREAD_FN(run_thread); // scheduler global data static uint32_t scheduler_count; static uint32_t scheduler_waiting; //schedulers waiting, global +static uint32_t context_waiting = 0; +static uint32_t thread_exit = 0; static scheduler_t* scheduler; static bool detect_quiescence; static bool shutdown_on_stop; @@ -49,8 +50,6 @@ static __pony_thread_local scheduler_t* this_scheduler; // forward declaration static void push(scheduler_t* sched, pony_actor_t* actor); -static void push_dormant(scheduler_t* sched, pony_actor_t* actor); -static pony_actor_t* pop_dormant(scheduler_t* sched); /** * Takes all actors off the injection queue and puts them on the scheduler list @@ -73,16 +72,6 @@ static pony_actor_t* pop(scheduler_t* sched) handle_inject(sched); pony_actor_t* actor; -#ifdef USE_DORMANT_LIST - counter++; - if (counter == 1024) { - counter = 0; - actor = pop_dormant(sched); - if (!actor) { - return actor; - } - } -#endif actor = sched->tail; @@ -97,10 +86,6 @@ static pony_actor_t* pop(scheduler_t* sched) } actor_setnext(actor, NULL); - } else { -#ifdef USE_DORMANT_LIST - actor = pop_dormant(sched); -#endif } return actor; @@ -140,46 +125,6 @@ static void push_first(scheduler_t* sched, pony_actor_t* actor) } } -static void push_dormant(scheduler_t* sched, pony_actor_t* actor) -{ - if (actor_dormant_next(actor)) { - // this actor exist in dormant list already - return; - } - - pony_actor_t* head = sched->dormant_head; - - if(head != NULL) - { - actor_set_dormant_next(head, actor); - sched->dormant_head = actor; - } else { - sched->dormant_head = actor; - sched->dormant_tail = actor; - } -} - -static pony_actor_t* pop_dormant(scheduler_t* sched) -{ - pony_actor_t* actor = sched->dormant_tail; - - if(actor != NULL) - { - if(actor != sched->dormant_head) - { - sched->dormant_tail = actor_dormant_next(actor); - } else { - sched->dormant_head = NULL; - sched->dormant_tail = NULL; - } - - actor_set_dormant_next(actor, NULL); - } - - return actor; -} - - /** * If we can terminate, return true. If all schedulers are waiting, one of * them will tell the cycle detector to try to terminate. @@ -278,14 +223,21 @@ static pony_actor_t* request(scheduler_t* sched) while(__pony_atomic_load_n(&sched->waiting, PONY_ATOMIC_ACQUIRE, PONY_ATOMIC_NO_TYPE) == 1) { - if(cpu_core_pause(tsc) && quiescent(sched)) + if(cpu_core_pause(tsc) && quiescent(sched)) { + sched->victim = NULL; + __pony_atomic_store_n(&sched->thief, NULL, PONY_ATOMIC_RELAXED, + PONY_ATOMIC_NO_TYPE); return NULL; + } } sched->victim = NULL; } else { - if(cpu_core_pause(tsc) && quiescent(sched)) + if(cpu_core_pause(tsc) && quiescent(sched)) { + __pony_atomic_store_n(&sched->thief, NULL, PONY_ATOMIC_RELAXED, + PONY_ATOMIC_NO_TYPE); return NULL; + } } if((actor = pop(sched)) != NULL) @@ -321,6 +273,8 @@ static void respond(scheduler_t* sched) scheduler_t* thief = (scheduler_t*)__pony_atomic_load_n(&sched->thief, PONY_ATOMIC_RELAXED, PONY_ATOMIC_NO_TYPE); + assert(thief != (void*)1); + if(thief == NULL) return; @@ -374,35 +328,84 @@ static void run(scheduler_t* sched) // if this returns true, reschedule the actor on our queue if(actor_run(actor)) { +#ifdef LAZY_IMPL sched = this_scheduler; -#ifdef USE_DORMANT_LIST - if (actor_emptyqueue(actor)) { - push_dormant(sched, actor); - } else { - push(sched, actor); - } -#else +#endif push(sched, actor); + } else { +#ifndef LAZY_IMPL + actor_unlock((encore_actor_t*)actor); #endif } } } -void public_run() +static void jump_buffer() { + __pony_atomic_fetch_add(&thread_exit, 1, PONY_ATOMIC_RELAXED, uint32_t); + while(__pony_atomic_load_n(&thread_exit, + PONY_ATOMIC_RELAXED, PONY_ATOMIC_NO_TYPE) < scheduler_count) ; +} + +static void __attribute__ ((noreturn)) jump_origin() +{ + static __pony_thread_local char stack[MINSIGSTKSZ]; + ucontext_t ctx; + getcontext(&ctx); + ctx.uc_stack.ss_sp = stack; + ctx.uc_stack.ss_size = MINSIGSTKSZ; + ctx.uc_link = origin; + makecontext(&ctx, (void(*)(void))&jump_buffer, 0); + setcontext(&ctx); + assert(0); +} + +void __attribute__ ((noreturn)) public_run(pthread_mutex_t *lock) +{ + assert(lock); + pthread_mutex_unlock(lock); assert(this_scheduler); run(this_scheduler); + + __pony_atomic_fetch_add(&context_waiting, 1, PONY_ATOMIC_RELAXED, uint32_t); + while(__pony_atomic_load_n(&context_waiting, + PONY_ATOMIC_RELAXED, PONY_ATOMIC_NO_TYPE) < scheduler_count) ; + + jump_origin(); } -static DEFINE_THREAD_FN(run_thread, +static void *run_thread(void *arg) { scheduler_t* sched = (scheduler_t*) arg; this_scheduler = sched; cpu_affinity(sched->cpu); + +#ifdef LAZY_IMPL + context ctx; + this_context = &ctx; + root = &ctx.ctx; + ucontext_t ucontext; + origin = &ucontext; + + getcontext(origin); + if (__pony_atomic_load_n(&context_waiting, + PONY_ATOMIC_RELAXED, PONY_ATOMIC_NO_TYPE) == scheduler_count) { + return NULL; + } +#endif + run(sched); - return 0; -}); +#ifdef LAZY_IMPL + __pony_atomic_fetch_add(&context_waiting, 1, PONY_ATOMIC_RELAXED, uint32_t); + while(__pony_atomic_load_n(&context_waiting, + PONY_ATOMIC_RELAXED, PONY_ATOMIC_NO_TYPE) < scheduler_count) ; + + jump_origin(); +#endif + + return NULL; +} static void scheduler_shutdown() { @@ -413,8 +416,11 @@ static void scheduler_shutdown() else start = 0; - for(uint32_t i = start; i < scheduler_count; i++) + for(uint32_t i = start; i < scheduler_count; i++) { pony_thread_join(scheduler[i].tid); + assert(pop(&scheduler[i]) == NULL); + } + assert(pop(&scheduler[0]) == NULL); __pony_atomic_store_n(&detect_quiescence, false, PONY_ATOMIC_RELAXED, PONY_ATOMIC_NO_TYPE); @@ -446,6 +452,9 @@ void scheduler_init(uint32_t threads, bool forcecd) threads = physical; scheduler_count = threads; +#if defined(SINGLE_THREAD_ON_MACOSX) && defined(PLATFORM_IS_MACOSX) + scheduler_count = 1; +#endif scheduler_waiting = 0; scheduler = (scheduler_t*)calloc(scheduler_count, sizeof(scheduler_t));