Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_s3: always use sync IO mode #6573

Merged
merged 1 commit into from
Feb 13, 2023
Merged

Conversation

PettitWesley
Copy link
Contributor

Signed-off-by: Wesley Pettit [email protected]


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Signed-off-by: Wesley Pettit <[email protected]>
@PettitWesley PettitWesley temporarily deployed to pr December 17, 2022 05:27 — with GitHub Actions Inactive
@PettitWesley PettitWesley temporarily deployed to pr December 17, 2022 05:27 — with GitHub Actions Inactive
@PettitWesley PettitWesley temporarily deployed to pr December 17, 2022 05:42 — with GitHub Actions Inactive
Copy link
Contributor

@matthewfala matthewfala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Thanks.
I'm wondering if a long term solution is to have a callback timer emit signal chunks that are ignored in terms of logs, but are used to trigger a special upload coroutine. If flush is triggered and the cb function is called from there then it can use the async network stack. It can also be governed by the FLB_SYNC_TASK option.

@matthewfala
Copy link
Contributor

The other option may be to have a pseudo event loop used only for the callback timer. It will only have one registered event and will essentially be an "epoll" wait. However it will allow us to opt into the async network stack. It is also compatible with the FLB_SYNC_TASK option.

This recipe relies on overriding the u_conn event loop, evl

struct mk_event_loop *evl;

Then the callback function needs to be split into 2 parts

  1. A coroutine invoker
  2. The actual callback function

The coroutine invoker can be based on the following code

struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
struct flb_input_instance *i_ins,
struct flb_output_instance *o_ins,
struct flb_config *config)
{
size_t stack_size;
struct flb_coro *coro;
struct flb_output_flush *out_flush;
struct flb_out_thread_instance *th_ins;
/* Custom output coroutine info */
out_flush = (struct flb_output_flush *) flb_calloc(1, sizeof(struct flb_output_flush));
if (!out_flush) {
flb_errno();
return NULL;
}
/* Create a new co-routine */
coro = flb_coro_create(out_flush);
if (!coro) {
flb_free(out_flush);
return NULL;
}
/*
* Each co-routine receives an 'id', the value is always incremental up to
* 16383.
*/
out_flush->id = flb_output_flush_id_get(o_ins);
out_flush->o_ins = o_ins;
out_flush->task = task;
out_flush->buffer = task->event_chunk->data;
out_flush->config = config;
out_flush->coro = coro;
coro->caller = co_active();
coro->callee = co_create(config->coro_stack_size,
output_pre_cb_flush, &stack_size);
if (coro->callee == NULL) {
flb_coro_destroy(coro);
flb_free(out_flush);
return NULL;
}
#ifdef FLB_HAVE_VALGRIND
coro->valgrind_stack_id = \
VALGRIND_STACK_REGISTER(coro->callee, ((char *) coro->callee) + stack_size);
#endif
if (o_ins->is_threaded == FLB_TRUE) {
th_ins = flb_output_thread_instance_get();
pthread_mutex_lock(&th_ins->flush_mutex);
mk_list_add(&out_flush->_head, &th_ins->flush_list);
pthread_mutex_unlock(&th_ins->flush_mutex);
}
else {
mk_list_add(&out_flush->_head, &o_ins->flush_list);
}
/* Workaround for makecontext() */
output_params_set(out_flush, coro, task, o_ins->p, o_ins->context, config);
return out_flush;
}

Instead of output_pre_cb_flush we use our s3 upload callback.

The coroutine can be registered to the event loop.

struct flb_upstream_conn {
struct mk_event event;
struct flb_coro *coro;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants