diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 65321112759..99366017f84 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -506,7 +506,6 @@ static int cb_s3_init(struct flb_output_instance *ins, { int ret; flb_sds_t tmp_sds; - int async_flags; char *role_arn = NULL; char *session_name; const char *tmp; @@ -914,8 +913,12 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT; } - /* init must use sync mode */ - async_flags = ctx->s3_client->upstream->flags; + /* + * S3 must ALWAYS use sync mode + * In the timer thread we do a mk_list_foreach_safe on the queue of uplaods and chunks + * Iterating over those lists is not concurrent safe. If a flush call ran at the same time + * And deleted an item from the list, this could cause a crash/corruption. + */ ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); /* clean up any old buffers found on startup */ @@ -950,17 +953,6 @@ static int cb_s3_init(struct flb_output_instance *ins, cb_s3_upload(config, ctx); } - if (ctx->use_put_object == FLB_TRUE) { - /* - * Run S3 in async mode. - * Multipart uploads don't work with async mode right now in high throughput - * cases. Its not clear why. Realistically, the performance of sync mode - * will be sufficient for most users, and long term we can do the work - * to enable async if needed. - */ - ctx->s3_client->upstream->flags = async_flags; - } - /* this is done last since in the previous block we make calls to AWS */ ctx->provider->provider_vtable->upstream_set(ctx->provider, ctx->ins); @@ -1691,7 +1683,6 @@ static int buffer_chunk(void *out_context, struct s3_file *upload_file, static void s3_upload_queue(struct flb_config *config, void *out_context) { int ret; - int async_flags; time_t now; struct upload_queue *upload_contents; struct flb_s3 *ctx = out_context; @@ -1707,12 +1698,6 @@ static void s3_upload_queue(struct flb_config *config, void *out_context) cb_s3_upload(config, out_context); } - /* upload timer must use sync mode */ - if (ctx->use_put_object == FLB_TRUE) { - async_flags = ctx->s3_client->upstream->flags; - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - } - /* Iterate through each file in upload queue */ mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { upload_contents = mk_list_entry(head, struct upload_queue, _head); @@ -1762,10 +1747,7 @@ static void s3_upload_queue(struct flb_config *config, void *out_context) } exit: - /* re-enable async mode */ - if (ctx->use_put_object == FLB_TRUE) { - ctx->s3_client->upstream->flags = async_flags; - } + return; } static void cb_s3_upload(struct flb_config *config, void *data) @@ -1781,16 +1763,9 @@ static void cb_s3_upload(struct flb_config *config, void *data) int complete; int ret; time_t now; - int async_flags; flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload).."); - /* upload timer must use sync mode */ - if (ctx->use_put_object == FLB_TRUE) { - async_flags = ctx->s3_client->upstream->flags; - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - } - now = time(NULL); /* Check all chunks and see if any have timed out */ @@ -1867,10 +1842,6 @@ static void cb_s3_upload(struct flb_config *config, void *data) } } } - - if (ctx->use_put_object == FLB_TRUE) { - ctx->s3_client->upstream->flags = async_flags; - } } static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char *data, @@ -2275,10 +2246,6 @@ static int cb_s3_exit(void *data, struct flb_config *config) } if (s3_store_has_data(ctx) == FLB_TRUE) { - if (ctx->use_put_object == FLB_TRUE) { - /* exit must run in sync mode */ - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - } flb_plg_info(ctx->ins, "Sending all locally buffered data to S3"); ret = put_all_chunks(ctx); if (ret < 0) {