Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_s3: always use sync IO mode #6573

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 7 additions & 40 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
{
int ret;
flb_sds_t tmp_sds;
int async_flags;
char *role_arn = NULL;
char *session_name;
const char *tmp;
Expand Down Expand Up @@ -913,8 +912,12 @@ static int cb_s3_init(struct flb_output_instance *ins,
ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT;
}

/* init must use sync mode */
async_flags = ctx->s3_client->upstream->flags;
/*
* S3 must ALWAYS use sync mode
* In the timer thread we do a mk_list_foreach_safe on the queue of uplaods and chunks
* Iterating over those lists is not concurrent safe. If a flush call ran at the same time
* And deleted an item from the list, this could cause a crash/corruption.
*/
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);

/* clean up any old buffers found on startup */
Expand Down Expand Up @@ -949,17 +952,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
cb_s3_upload(config, ctx);
}

if (ctx->use_put_object == FLB_TRUE) {
/*
* Run S3 in async mode.
* Multipart uploads don't work with async mode right now in high throughput
* cases. Its not clear why. Realistically, the performance of sync mode
* will be sufficient for most users, and long term we can do the work
* to enable async if needed.
*/
ctx->s3_client->upstream->flags = async_flags;
}

/* this is done last since in the previous block we make calls to AWS */
ctx->provider->provider_vtable->upstream_set(ctx->provider, ctx->ins);

Expand Down Expand Up @@ -1684,7 +1676,6 @@ static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_
static void s3_upload_queue(struct flb_config *config, void *out_context)
{
int ret;
int async_flags;
time_t now;
struct upload_queue *upload_contents;
struct flb_s3 *ctx = out_context;
Expand All @@ -1700,12 +1691,6 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
cb_s3_upload(config, out_context);
}

/* upload timer must use sync mode */
if (ctx->use_put_object == FLB_TRUE) {
async_flags = ctx->s3_client->upstream->flags;
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
}

/* Iterate through each file in upload queue */
mk_list_foreach_safe(head, tmp, &ctx->upload_queue) {
upload_contents = mk_list_entry(head, struct upload_queue, _head);
Expand Down Expand Up @@ -1755,10 +1740,7 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
}

exit:
/* re-enable async mode */
if (ctx->use_put_object == FLB_TRUE) {
ctx->s3_client->upstream->flags = async_flags;
}
return;
}

static void cb_s3_upload(struct flb_config *config, void *data)
Expand All @@ -1774,16 +1756,9 @@ static void cb_s3_upload(struct flb_config *config, void *data)
int complete;
int ret;
time_t now;
int async_flags;

flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload)..");

/* upload timer must use sync mode */
if (ctx->use_put_object == FLB_TRUE) {
async_flags = ctx->s3_client->upstream->flags;
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
}

now = time(NULL);

/* Check all chunks and see if any have timed out */
Expand Down Expand Up @@ -1860,10 +1835,6 @@ static void cb_s3_upload(struct flb_config *config, void *data)
}
}
}

if (ctx->use_put_object == FLB_TRUE) {
ctx->s3_client->upstream->flags = async_flags;
}
}

static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char *data,
Expand Down Expand Up @@ -2230,10 +2201,6 @@ static int cb_s3_exit(void *data, struct flb_config *config)
}

if (s3_store_has_data(ctx) == FLB_TRUE) {
if (ctx->use_put_object == FLB_TRUE) {
/* exit must run in sync mode */
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
}
flb_plg_info(ctx->ins, "Sending all locally buffered data to S3");
ret = put_all_chunks(ctx);
if (ret < 0) {
Expand Down