Skip to content

Commit

Permalink
out_s3: always use sync IO mode
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley authored and edsiper committed Feb 13, 2023
1 parent 7a20fea commit 8063397
Showing 1 changed file with 7 additions and 40 deletions.
47 changes: 7 additions & 40 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
{
int ret;
flb_sds_t tmp_sds;
int async_flags;
char *role_arn = NULL;
char *session_name;
const char *tmp;
Expand Down Expand Up @@ -914,8 +913,12 @@ static int cb_s3_init(struct flb_output_instance *ins,
ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT;
}

/* init must use sync mode */
async_flags = ctx->s3_client->upstream->flags;
/*
* S3 must ALWAYS use sync mode
* In the timer thread we do a mk_list_foreach_safe on the queue of uplaods and chunks
* Iterating over those lists is not concurrent safe. If a flush call ran at the same time
* And deleted an item from the list, this could cause a crash/corruption.
*/
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);

/* clean up any old buffers found on startup */
Expand Down Expand Up @@ -950,17 +953,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
cb_s3_upload(config, ctx);
}

if (ctx->use_put_object == FLB_TRUE) {
/*
* Run S3 in async mode.
* Multipart uploads don't work with async mode right now in high throughput
* cases. Its not clear why. Realistically, the performance of sync mode
* will be sufficient for most users, and long term we can do the work
* to enable async if needed.
*/
ctx->s3_client->upstream->flags = async_flags;
}

/* this is done last since in the previous block we make calls to AWS */
ctx->provider->provider_vtable->upstream_set(ctx->provider, ctx->ins);

Expand Down Expand Up @@ -1691,7 +1683,6 @@ static int buffer_chunk(void *out_context, struct s3_file *upload_file,
static void s3_upload_queue(struct flb_config *config, void *out_context)
{
int ret;
int async_flags;
time_t now;
struct upload_queue *upload_contents;
struct flb_s3 *ctx = out_context;
Expand All @@ -1707,12 +1698,6 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
cb_s3_upload(config, out_context);
}

/* upload timer must use sync mode */
if (ctx->use_put_object == FLB_TRUE) {
async_flags = ctx->s3_client->upstream->flags;
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
}

/* Iterate through each file in upload queue */
mk_list_foreach_safe(head, tmp, &ctx->upload_queue) {
upload_contents = mk_list_entry(head, struct upload_queue, _head);
Expand Down Expand Up @@ -1762,10 +1747,7 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
}

exit:
/* re-enable async mode */
if (ctx->use_put_object == FLB_TRUE) {
ctx->s3_client->upstream->flags = async_flags;
}
return;
}

static void cb_s3_upload(struct flb_config *config, void *data)
Expand All @@ -1781,16 +1763,9 @@ static void cb_s3_upload(struct flb_config *config, void *data)
int complete;
int ret;
time_t now;
int async_flags;

flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload)..");

/* upload timer must use sync mode */
if (ctx->use_put_object == FLB_TRUE) {
async_flags = ctx->s3_client->upstream->flags;
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
}

now = time(NULL);

/* Check all chunks and see if any have timed out */
Expand Down Expand Up @@ -1867,10 +1842,6 @@ static void cb_s3_upload(struct flb_config *config, void *data)
}
}
}

if (ctx->use_put_object == FLB_TRUE) {
ctx->s3_client->upstream->flags = async_flags;
}
}

static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char *data,
Expand Down Expand Up @@ -2275,10 +2246,6 @@ static int cb_s3_exit(void *data, struct flb_config *config)
}

if (s3_store_has_data(ctx) == FLB_TRUE) {
if (ctx->use_put_object == FLB_TRUE) {
/* exit must run in sync mode */
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
}
flb_plg_info(ctx->ins, "Sending all locally buffered data to S3");
ret = put_all_chunks(ctx);
if (ret < 0) {
Expand Down

0 comments on commit 8063397

Please sign in to comment.