Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timer with coroutine for output plugins - AWS Distro rebase #26

Open
wants to merge 2 commits into
base: 2_31_10_all_cherry_picks
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 222 additions & 1 deletion include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ struct flb_output_instance {
struct mk_list flush_list;
struct mk_list flush_list_destroy;

/* similar to flush coroutine list above, timer coroutine list */
struct mk_list timer_coro_list;
struct mk_list timer_coro_list_destroy;

/* Keep a reference to the original context this instance belongs to */
struct flb_config *config;
};
Expand All @@ -428,6 +432,28 @@ struct flb_output_flush {
struct mk_list _head; /* Link to flb_task->threads */
};

/*
* stores timer coros on the timer_coro_list, if the output uses them
*/
struct flb_output_timer_coro {
struct flb_config *config; /* FLB context */
struct flb_output_instance *o_ins; /* output instance */
struct flb_output_coro_timer_data *timer_data; /* callback info */
struct flb_coro *coro; /* parent coro addr */
struct mk_list _head; /* Link to timer_coro_list */
};

/*
* If the output uses timer coros, then this is used as the callback data
* passed to flb_sched_timer_cb_create
*/
struct flb_output_coro_timer_data {
struct flb_output_instance *ins; /* associate coro with this output instance */
flb_sds_t job_name; /* used on engine shutdown, print pending "custom" jobs */
void (*cb) (struct flb_config *config, void *data); /* call this output callback in the coro */
void *data; /* opaque data to pass to the above cb */
};

static FLB_INLINE int flb_output_is_threaded(struct flb_output_instance *ins)
{
return ins->is_threaded;
Expand All @@ -443,6 +469,19 @@ static FLB_INLINE void flb_output_flush_destroy(struct flb_output_flush *out_flu
flb_free(out_flush);
}

/*
* See below note for flb_out_flush_params
* this is equivalent for timer coroutines
*/
struct flb_out_timer_coro_params {
struct flb_output_timer_coro *output_timer; /* output flush */
struct flb_output_coro_timer_data *timer_data; /* callback info */
struct flb_config *config; /* Fluent Bit context */
struct flb_coro *coro; /* coroutine context */
};

extern FLB_TLS_DEFINE(struct flb_out_timer_coro_params, timer_coro_params);

/*
* libco do not support parameters in the entrypoint function due to the
* complexity of implementation in terms of architecture and compiler, but
Expand Down Expand Up @@ -526,6 +565,135 @@ static FLB_INLINE void output_pre_cb_flush(void)
persisted_params.config);
}

/* same as above but for timer coros */
static FLB_INLINE void output_pre_timer_cb(void)
{
struct flb_coro *coro;
struct flb_out_timer_coro_params *params;
struct flb_out_timer_coro_params persisted_params;
struct flb_output_coro_timer_data *timer_data;
struct flb_output_instance *o_ins;
struct flb_out_thread_instance *th_ins;
struct flb_output_timer_coro *timer_coro;


params = (struct flb_out_timer_coro_params *) FLB_TLS_GET(timer_coro_params);
if (!params) {
flb_error("[output] no timer coro params defined, unexpected");
return;
}

/*
* flush coros are actually started in engine after they are
* written down a pipe. This seems unnecessary here. So we can start it right away.
* If this works, I can remove the persisted params.
*/
coro = params->coro;
persisted_params = *params;
timer_coro = params->output_timer;
o_ins = params->output_timer->o_ins;
// co_switch(coro->caller);

timer_data = persisted_params.timer_data;
timer_data->cb(persisted_params.config, timer_data->data);
// after output callback is done, just do clean up here?
// similar to: flb_output_flush_prepare_destroy
/* Move timer coroutine context from active list to the destroy one */
if (flb_output_is_threaded(o_ins) == FLB_TRUE) {
th_ins = flb_output_thread_instance_get();
pthread_mutex_lock(&th_ins->timer_mutex);
mk_list_del(&timer_coro->_head);
mk_list_add(&timer_coro->_head, &th_ins->timer_coro_list_destroy);
pthread_mutex_unlock(&th_ins->timer_mutex);
}
else {
mk_list_del(&timer_coro->_head);
mk_list_add(&timer_coro->_head, &o_ins->timer_coro_list_destroy);
}

/* yield back to caller/control code */
flb_coro_yield(coro, FLB_TRUE);
}

/*
* If the output uses scheduled timers with coroutines,
* this function is used as the callback for flb_sched_timer_cb_create
*/
static FLB_INLINE
void flb_output_coro_timer_cb(struct flb_config *config, void *data)
{
size_t stack_size;
struct flb_coro *coro;
struct flb_output_timer_coro *timer_coro;
struct flb_out_thread_instance *th_ins;
struct flb_output_coro_timer_data *ctx = (struct flb_output_coro_timer_data *) data;
struct flb_out_timer_coro_params *params;
struct flb_output_instance *o_ins;

/* Custom output coroutine info */
timer_coro = (struct flb_output_timer_coro *) flb_calloc(1, sizeof(struct flb_output_timer_coro));
if (!timer_coro) {
flb_errno();
return;
}

/* Create a new co-routine */
coro = flb_coro_create(timer_coro);
if (!coro) {
flb_free(timer_coro);
return;
}

o_ins = ctx->ins;
timer_coro->o_ins = o_ins;
timer_coro->config = config;
timer_coro->coro = coro;

coro->caller = co_active();
coro->callee = co_create(config->coro_stack_size,
output_pre_timer_cb, &stack_size);

if (coro->callee == NULL) {
flb_coro_destroy(coro);
flb_free(timer_coro);
return;
}

#ifdef FLB_HAVE_VALGRIND
coro->valgrind_stack_id = \
VALGRIND_STACK_REGISTER(coro->callee, ((char *) coro->callee) + stack_size);
#endif

if (o_ins->is_threaded == FLB_TRUE) {
th_ins = flb_output_thread_instance_get();
pthread_mutex_lock(&th_ins->timer_mutex);
mk_list_add(&timer_coro->_head, &th_ins->timer_coro_list);
pthread_mutex_unlock(&th_ins->timer_mutex);
}
else {
mk_list_add(&timer_coro->_head, &o_ins->timer_coro_list);
}

params = (struct flb_out_timer_coro_params *) FLB_TLS_GET(timer_coro_params);
if (!params) {
params = (struct flb_out_timer_coro_params *) flb_calloc(1, sizeof(struct flb_out_flush_params));
if (!params) {
flb_errno();
return;
}
}

/* Callback parameters in order */
params->output_timer = timer_coro;
params->timer_data = ctx;
params->config = config;
params->coro = coro;

FLB_TLS_SET(timer_coro_params, params);
co_switch(coro->callee);
return;
}

void flb_output_flush_prepare_destroy(struct flb_output_flush *out_flush);
int flb_output_flush_id_get(struct flb_output_instance *ins);

Expand Down Expand Up @@ -659,7 +827,11 @@ static inline void flb_output_return(int ret, struct flb_coro *co) {
flb_output_flush_prepare_destroy(out_flush);
}

/* return the number of co-routines running in the instance */
/*
* return the number of flush co-routines running in the instance
* Currently, this function is only used for FLB_OUTPUT_NO_MULTIPLEX
* and does not count timer_coros, used by S3 output
*/
static inline int flb_output_coros_size(struct flb_output_instance *ins)
{
int size = 0;
Expand All @@ -678,6 +850,55 @@ static inline int flb_output_coros_size(struct flb_output_instance *ins)
return size;
}

/* Used in engine flb_running_count */
static inline int flb_output_timer_coros_size(struct flb_output_instance *ins)
{
int size = 0;

if (flb_output_is_threaded(ins) == FLB_TRUE) {
/*
* On threaded mode, we need to count the active co-routines of
* every running thread of the thread pool.
*/
size = flb_output_thread_pool_timer_coros_size(ins);
}
else {
size = mk_list_size(&ins->timer_coro_list);
}

return size;
}

static inline void flb_timer_coros_print(struct mk_list *timer_coro_list)
{
struct flb_output_timer_coro *timer_coro;
struct mk_list *tmp;
struct mk_list *head;
int n = mk_list_size(timer_coro_list);
if (n != 0) {
/* get one coro for the job_name */
mk_list_foreach_safe(head, tmp, timer_coro_list) {
timer_coro = mk_list_entry(head, struct flb_output_timer_coro, _head);
if (timer_coro != NULL) {
flb_info("[task] output=%s still running %d %s(s)",
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is this will say "output=s3.1 still running 1 Upload(s)"

It won't actually be accurate tho it looks nice. A timer coroutine is not the same as an upload, each one could send many uploads.

Each timer coro is just a thread/helper to peform uploads.

I can't think of a better word to use here tho.

timer_coro->o_ins->alias, n, timer_coro->timer_data->job_name);
break;
}
}
}
}

/* Used in engine flb_running_print */
static inline void flb_output_timer_coros_print(struct flb_output_instance *ins)
{
if (flb_output_is_threaded(ins) == FLB_TRUE) {
flb_output_thread_pool_timer_coros_print(ins);
}
else {
flb_timer_coros_print(&ins->timer_coro_list);
}
}

static inline void flb_output_return_do(int x)
{
struct flb_coro *coro;
Expand Down
10 changes: 8 additions & 2 deletions include/fluent-bit/flb_output_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ struct flb_out_thread_instance {
* 'flushes' running by a threaded instance, then the access to the 'flush_list'
* must be protected: we use 'flush_mutex for that purpose.
*/
pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */
pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */

/* Same as flush_mutex but for timer coros */
struct mk_list timer_coro_list; /* flush context list */
struct mk_list timer_coro_list_destroy; /* flust context destroy list */
pthread_mutex_t timer_mutex; /* mutex for 'flush_list' */

/* List of mapped 'upstream' contexts */
struct mk_list upstreams;
Expand All @@ -100,7 +105,8 @@ int flb_output_thread_pool_start(struct flb_output_instance *ins);
int flb_output_thread_pool_flush(struct flb_task *task,
struct flb_output_instance *out_ins,
struct flb_config *config);

int flb_output_thread_pool_timer_coros_size(struct flb_output_instance *ins);
void flb_output_thread_pool_timer_coros_print(struct flb_output_instance *ins);

void flb_output_thread_instance_init();
struct flb_out_thread_instance *flb_output_thread_instance_get();
Expand Down
44 changes: 38 additions & 6 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,37 @@ static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts,
return 0;
}

static int flb_running_count(struct flb_config *config)
{
int tasks, timers, n = 0;
struct mk_list *head;
struct mk_list *tmp;
struct flb_output_instance *o_ins;

mk_list_foreach_safe(head, tmp, &config->outputs) {
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
n = flb_output_timer_coros_size(o_ins);
timers += n;
}

tasks = flb_task_running_count(config);
return tasks + timers;
}

static void flb_running_print(struct flb_config *config)
{
struct mk_list *head ;
struct mk_list *tmp;
struct flb_output_instance *o_ins;

flb_task_running_print(config);

mk_list_foreach_safe(head, tmp, &config->outputs) {
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
flb_output_timer_coros_print(o_ins);
}
}

static inline int flb_engine_manager(flb_pipefd_t fd, struct flb_config *config)
{
int bytes;
Expand Down Expand Up @@ -543,6 +574,7 @@ int sb_segregate_chunks(struct flb_config *config)
int flb_engine_start(struct flb_config *config)
{
int ret;
int count;
uint64_t ts;
char tmp[16];
struct flb_time t_flush;
Expand Down Expand Up @@ -813,19 +845,19 @@ int flb_engine_start(struct flb_config *config)
* resources allocated by that co-routine, the best thing is to
* wait again for the grace period and re-check again.
*/
ret = flb_task_running_count(config);
if (ret > 0 && config->grace_count < config->grace) {
count = flb_running_count(config);
if (count > 0 && config->grace_count < config->grace) {
if (config->grace_count == 1) {
flb_task_running_print(config);
flb_running_print(config);
}
flb_engine_exit(config);
}
else {
if (ret > 0) {
flb_task_running_print(config);
if (count > 0) {
flb_running_print(config);
}
flb_info("[engine] service has stopped (%i pending tasks)",
ret);
count);
ret = config->exit_status_code;
flb_engine_shutdown(config);
config = NULL;
Expand Down
7 changes: 7 additions & 0 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
#include <fluent-bit/flb_pack.h>

FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params);
FLB_TLS_DEFINE(struct flb_out_timer_coro_params, timer_coro_params);

void flb_output_prepare()
{
FLB_TLS_INIT(out_flush_params);
FLB_TLS_INIT(timer_coro_params);
}

/* Validate the the output address protocol */
Expand Down Expand Up @@ -478,6 +480,11 @@ void flb_output_exit(struct flb_config *config)
if (params) {
flb_free(params);
}
params = FLB_TLS_GET(timer_coro_params);
if (params) {
flb_free(params);
}

}

static inline int instance_id(struct flb_config *config)
Expand Down
Loading