Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add eager and lazy future support. #114

Merged
merged 1 commit into from
Apr 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 127 additions & 107 deletions src/runtime/encore/encore.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,9 @@
#include <stdlib.h>
#include <assert.h>
#include <ucontext.h>
#include <stdio.h>

extern void public_run();

bool has_flag(pony_actor_t* actor, uint8_t flag);

typedef struct context {
ucontext_t ctx;
struct context *next;
} context;
extern void public_run(pthread_mutex_t *lock);

enum
{
Expand All @@ -25,13 +19,24 @@ enum

#define MAX_IN_POOL 4

static __pony_thread_local stack_page *stack_pool = NULL;
static __pony_thread_local unsigned int available_pages = 0;
static __pony_thread_local stack_page *local_page = NULL;
inline static void assert_swap(ucontext_t *old, ucontext_t *new)
{
int ret = swapcontext(old, new);
assert(ret == 0);
}

static __pony_thread_local context *context_pool = NULL;
static __pony_thread_local unsigned int available_context = 0;
static __pony_thread_local context *this_context = NULL;

__pony_thread_local context *this_context;
__pony_thread_local ucontext_t *origin;
__pony_thread_local ucontext_t *root;

#ifndef LAZY_IMPL

static __pony_thread_local stack_page *stack_pool = NULL;
static __pony_thread_local unsigned int available_pages = 0;
static __pony_thread_local stack_page *local_page = NULL;

static stack_page *pop_page()
{
Expand All @@ -49,96 +54,50 @@ static stack_page *pop_page()
return page;
}

void *get_local_page_stack()
{
local_page = local_page ? local_page : pop_page();
assert(local_page);
assert(local_page->stack);
return local_page->stack;
}

static void push_page(stack_page *page)
{
available_pages++;
page->next = stack_pool;
stack_pool = page;
}

void reclaim_page(encore_actor_t *a)
static void reclaim_page(encore_actor_t *a)
{
push_page(a->page);
a->page = NULL;
}

static void clean_pool()
void *get_local_page_stack()
{
#ifndef LAZY_IMPL

static stack_page *page, *next;
while (available_pages > MAX_IN_POOL) {
available_pages--;
page = stack_pool;
next = stack_pool->next;
free(page->stack);
free(page);
stack_pool = next;
}

#else

static context *ctx, *next;
while (available_context > MAX_IN_POOL) {
available_context--;
ctx = context_pool;
next = context_pool->next;
free(ctx->ctx.uc_stack.ss_sp);
free(ctx);
context_pool = next;
}

#endif
local_page = local_page ? local_page : pop_page();
assert(local_page);
assert(local_page->stack);
return local_page->stack;
}

void actor_suspend(encore_actor_t *actor)
void actor_set_run_to_completion(encore_actor_t *actor)
{
// Make a copy of the current context and replace it
ucontext_t ctxp = actor->ctx;

ctx_wrapper d = { .ctx = &ctxp, .uc_link=ctxp.uc_link };
encore_arg_t argv[1] = { { .p = &d } };
actor->page = local_page;
local_page = NULL;

// TODO find out the right way of calling pond_sendv
// pony_sendv(actor, FUT_MSG_SUSPEND, 1, argv);

actor->run_to_completion = false;
int ret = swapcontext(&ctxp, ctxp.uc_link);
assert(ret == 0);
assert(ctxp.uc_link == &actor->home_ctx);
actor->run_to_completion = true;
}

// TODO: suspend and await overlaps heavily
void actor_await(encore_actor_t *actor, void *future)
bool actor_run_to_completion(encore_actor_t *actor)
{
// Make a copy of the current context and replace it
ucontext_t ctxp = actor->ctx;
ctx_wrapper d = { .ctx = &ctxp, .uc_link=ctxp.uc_link };
encore_arg_t argv[2] = { { .p = &d }, { .p = future } };
return actor->run_to_completion;
}

actor->page = local_page;
local_page = NULL;
void actor_unlock(encore_actor_t *actor)
{
if (actor->lock) {
pthread_mutex_unlock(actor->lock);
actor->lock = NULL;
}
}

// TODO find out the right way of calling pond_sendv
// pony_sendv(actor, FUT_MSG_AWAIT, 2, argv);
#endif

actor->run_to_completion = false;
int ret = swapcontext(&ctxp, ctxp.uc_link);
assert(ret == 0);
assert(ctxp.uc_link == &actor->home_ctx);
}
#ifdef LAZY_IMPL

static context *pop_context()
static context *pop_context(encore_actor_t *actor)
{
static context *c;
if (available_context == 0) {
Expand All @@ -149,10 +108,10 @@ static context *pop_context()
assert(ret == 0);
context_pool->ctx.uc_stack.ss_size = Stack_Size;
context_pool->ctx.uc_stack.ss_flags = 0;
makecontext(&context_pool->ctx, (void(*)(void))public_run, 0);
} else {
available_context--;
}
makecontext(&context_pool->ctx, (void(*)(void))public_run, 1, actor->lock);
c = context_pool;
context_pool = c->next;
assert(c->ctx.uc_stack.ss_sp);
Expand All @@ -166,6 +125,37 @@ static void push_context(context *ctx)
context_pool = ctx;
}

#endif

static void clean_pool()
{
#ifndef LAZY_IMPL

static stack_page *page, *next;
while (available_pages > MAX_IN_POOL) {
available_pages--;
page = stack_pool;
next = stack_pool->next;
free(page->stack);
free(page);
stack_pool = next;
}

#else

static context *ctx, *next;
while (available_context > MAX_IN_POOL) {
available_context--;
ctx = context_pool;
next = context_pool->next;
free(ctx->ctx.uc_stack.ss_sp);
free(ctx);
context_pool = next;
}

#endif
}

void actor_block(encore_actor_t *actor)
{
#ifndef LAZY_IMPL
Expand All @@ -178,18 +168,18 @@ void actor_block(encore_actor_t *actor)
assert(actor->page);
assert(actor->page->stack);
actor->run_to_completion = false;
int ret = swapcontext(&actor->ctx, actor->ctx.uc_link);
assert(ret == 0);
assert_swap(&actor->ctx, actor->ctx.uc_link);

#else

ucontext_t ctx;
actor->saved = &ctx;
context *previous = this_context;
this_context = pop_context();
int ret = swapcontext(&ctx, &this_context->ctx);
assert(ret == 0);
this_context = previous;
actor->saved = &this_context->ctx;

context *old = this_context;

this_context = pop_context(actor);
assert_swap(actor->saved, &this_context->ctx);

this_context = old;

#endif
}
Expand All @@ -199,24 +189,13 @@ void actor_set_resume(encore_actor_t *actor)
actor->resume = true;
}

void actor_set_run_to_completion(encore_actor_t *actor)
{
actor->run_to_completion = true;
}

bool actor_run_to_completion(encore_actor_t *actor)
{
return actor->run_to_completion;
}

void actor_resume(encore_actor_t *actor)
{
#ifndef LAZY_IMPL

actor->resume = false;
actor->run_to_completion = true;
int ret = swapcontext(actor->ctx.uc_link, &actor->ctx);
assert(ret == 0);
assert_swap(actor->ctx.uc_link, &actor->ctx);

if (actor->run_to_completion) {
reclaim_page(actor);
Expand All @@ -225,15 +204,58 @@ void actor_resume(encore_actor_t *actor)
#else

actor->resume = false;
if (this_context) {
if (&this_context->ctx != root) {
push_context(this_context);
}
setcontext(actor->saved);
exit(1);
assert(0);

#endif
}

void actor_suspend(encore_actor_t *actor)
{
#ifndef LAZY_IMPL
// Make a copy of the current context and replace it
ucontext_t ctxp = actor->ctx;

ctx_wrapper d = { .ctx = &ctxp, .uc_link=ctxp.uc_link };
encore_arg_t argv[1] = { { .p = &d } };
actor->page = local_page;
local_page = NULL;

// TODO find out the right way of calling pond_sendv
// pony_sendv(actor, FUT_MSG_SUSPEND, 1, argv);

actor->run_to_completion = false;
int ret = swapcontext(&ctxp, ctxp.uc_link);
assert(ret == 0);
assert(ctxp.uc_link == &actor->home_ctx);
#endif
}

// TODO: suspend and await overlaps heavily
void actor_await(encore_actor_t *actor, void *future)
{
#ifndef LAZY_IMPL
// Make a copy of the current context and replace it
ucontext_t ctxp = actor->ctx;
ctx_wrapper d = { .ctx = &ctxp, .uc_link=ctxp.uc_link };
encore_arg_t argv[2] = { { .p = &d }, { .p = future } };

actor->page = local_page;
local_page = NULL;

// TODO find out the right way of calling pond_sendv
// pony_sendv(actor, FUT_MSG_AWAIT, 2, argv);

actor->run_to_completion = false;
int ret = swapcontext(&ctxp, ctxp.uc_link);
assert(ret == 0);
assert(ctxp.uc_link == &actor->home_ctx);
#endif
}

encore_actor_t *encore_create(pony_type_t *type)
{
return (encore_actor_t *)pony_create(type);
Expand Down Expand Up @@ -272,12 +294,10 @@ bool encore_actor_run_hook(encore_actor_t *actor)
if(actor->resume)
{
actor_resume(actor);
#ifndef LAZY_IMPL
return !has_flag((pony_actor_t *)actor, FLAG_UNSCHEDULED);
#endif
return true;
}

return true;
return false;
}

bool encore_actor_handle_message_hook(encore_actor_t *actor, pony_msg_t* msg)
Expand Down
Loading