Skip to content

Commit

Permalink
plugins: out_s3: updated the code to use the new base stream type
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich authored and edsiper committed Aug 31, 2022
1 parent 0a80c0b commit 162a0e0
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -919,8 +919,8 @@ static int cb_s3_init(struct flb_output_instance *ins,
}

/* init must use sync mode */
async_flags = ctx->s3_client->upstream->flags;
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
async_flags = flb_stream_get_flags(&ctx->s3_client->upstream->base);
flb_stream_disable_async_mode(&ctx->s3_client->upstream->base);

/* clean up any old buffers found on startup */
if (ctx->has_old_buffers == FLB_TRUE) {
Expand Down Expand Up @@ -962,7 +962,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
* will be sufficient for most users, and long term we can do the work
* to enable async if needed.
*/
ctx->s3_client->upstream->flags = async_flags;
flb_stream_set_flags(&ctx->s3_client->upstream->base, async_flags);
}

/* this is done last since in the previous block we make calls to AWS */
Expand Down Expand Up @@ -1710,8 +1710,8 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)

/* upload timer must use sync mode */
if (ctx->use_put_object == FLB_TRUE) {
async_flags = ctx->s3_client->upstream->flags;
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
async_flags = flb_stream_get_flags(&ctx->s3_client->upstream->base);
flb_stream_disable_async_mode(&ctx->s3_client->upstream->base);
}

/* Iterate through each file in upload queue */
Expand Down Expand Up @@ -1765,7 +1765,7 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
exit:
/* re-enable async mode */
if (ctx->use_put_object == FLB_TRUE) {
ctx->s3_client->upstream->flags = async_flags;
flb_stream_set_flags(&ctx->s3_client->upstream->base, async_flags);
}
}

Expand All @@ -1788,8 +1788,8 @@ static void cb_s3_upload(struct flb_config *config, void *data)

/* upload timer must use sync mode */
if (ctx->use_put_object == FLB_TRUE) {
async_flags = ctx->s3_client->upstream->flags;
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
async_flags = flb_stream_get_flags(&ctx->s3_client->upstream->base);
flb_stream_disable_async_mode(&ctx->s3_client->upstream->base);
}

now = time(NULL);
Expand Down Expand Up @@ -1870,7 +1870,7 @@ static void cb_s3_upload(struct flb_config *config, void *data)
}

if (ctx->use_put_object == FLB_TRUE) {
ctx->s3_client->upstream->flags = async_flags;
flb_stream_set_flags(&ctx->s3_client->upstream->base, async_flags);
}
}

Expand Down Expand Up @@ -2240,7 +2240,7 @@ static int cb_s3_exit(void *data, struct flb_config *config)
if (s3_store_has_data(ctx) == FLB_TRUE) {
if (ctx->use_put_object == FLB_TRUE) {
/* exit must run in sync mode */
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);
flb_stream_disable_async_mode(&ctx->s3_client->upstream->base);
}
flb_plg_info(ctx->ins, "Sending all locally buffered data to S3");
ret = put_all_chunks(ctx);
Expand Down

0 comments on commit 162a0e0

Please sign in to comment.