From 232367a705c3788aecdda827cc99a69f2b7a01ef Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 16 Dec 2022 21:26:37 -0800 Subject: [PATCH] out_s3: always use sync IO mode Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 47 +++++++-------------------------------------- 1 file changed, 7 insertions(+), 40 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 5f4c1027852..39831f454f6 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -505,7 +505,6 @@ static int cb_s3_init(struct flb_output_instance *ins, { int ret; flb_sds_t tmp_sds; - int async_flags; char *role_arn = NULL; char *session_name; const char *tmp; @@ -913,8 +912,12 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT; } - /* init must use sync mode */ - async_flags = ctx->s3_client->upstream->flags; + /* + * S3 must ALWAYS use sync mode + * In the timer thread we do a mk_list_foreach_safe on the queue of uplaods and chunks + * Iterating over those lists is not concurrent safe. If a flush call ran at the same time + * And deleted an item from the list, this could cause a crash/corruption. + */ ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); /* clean up any old buffers found on startup */ @@ -949,17 +952,6 @@ static int cb_s3_init(struct flb_output_instance *ins, cb_s3_upload(config, ctx); } - if (ctx->use_put_object == FLB_TRUE) { - /* - * Run S3 in async mode. - * Multipart uploads don't work with async mode right now in high throughput - * cases. Its not clear why. Realistically, the performance of sync mode - * will be sufficient for most users, and long term we can do the work - * to enable async if needed. - */ - ctx->s3_client->upstream->flags = async_flags; - } - /* this is done last since in the previous block we make calls to AWS */ ctx->provider->provider_vtable->upstream_set(ctx->provider, ctx->ins); @@ -1684,7 +1676,6 @@ static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_ static void s3_upload_queue(struct flb_config *config, void *out_context) { int ret; - int async_flags; time_t now; struct upload_queue *upload_contents; struct flb_s3 *ctx = out_context; @@ -1700,12 +1691,6 @@ static void s3_upload_queue(struct flb_config *config, void *out_context) cb_s3_upload(config, out_context); } - /* upload timer must use sync mode */ - if (ctx->use_put_object == FLB_TRUE) { - async_flags = ctx->s3_client->upstream->flags; - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - } - /* Iterate through each file in upload queue */ mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { upload_contents = mk_list_entry(head, struct upload_queue, _head); @@ -1755,10 +1740,7 @@ static void s3_upload_queue(struct flb_config *config, void *out_context) } exit: - /* re-enable async mode */ - if (ctx->use_put_object == FLB_TRUE) { - ctx->s3_client->upstream->flags = async_flags; - } + return; } static void cb_s3_upload(struct flb_config *config, void *data) @@ -1774,16 +1756,9 @@ static void cb_s3_upload(struct flb_config *config, void *data) int complete; int ret; time_t now; - int async_flags; flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload).."); - /* upload timer must use sync mode */ - if (ctx->use_put_object == FLB_TRUE) { - async_flags = ctx->s3_client->upstream->flags; - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - } - now = time(NULL); /* Check all chunks and see if any have timed out */ @@ -1860,10 +1835,6 @@ static void cb_s3_upload(struct flb_config *config, void *data) } } } - - if (ctx->use_put_object == FLB_TRUE) { - ctx->s3_client->upstream->flags = async_flags; - } } static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char *data, @@ -2230,10 +2201,6 @@ static int cb_s3_exit(void *data, struct flb_config *config) } if (s3_store_has_data(ctx) == FLB_TRUE) { - if (ctx->use_put_object == FLB_TRUE) { - /* exit must run in sync mode */ - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - } flb_plg_info(ctx->ins, "Sending all locally buffered data to S3"); ret = put_all_chunks(ctx); if (ret < 0) {