diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 99366017f84..b7475cbc8c6 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -48,7 +48,7 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, char *body, size_t body_size); -static int put_all_chunks(struct flb_s3 *ctx); +static int put_all_chunks(struct flb_s3 *ctx, int is_startup); static void cb_s3_upload(struct flb_config *ctx, void *data); @@ -119,6 +119,45 @@ static char *mock_error_response(char *error_env_var) return NULL; } +static void s3_retry_warn(struct flb_s3 *ctx, const char *tag, + char *input_name, time_t create_time, + int less_than_limit) +{ + struct tm now_time; + char create_time_str[20]; + struct tm *tmp; + + tmp = localtime_r(&create_time, &now_time); + strftime(create_time_str, 20, "%Y-%m-%d %H:%M:%S", tmp); + if (input_name == NULL || strlen(input_name) == 0) { + if (less_than_limit == FLB_TRUE) { + flb_plg_warn(ctx->ins, + "failed to flush chunk tag=%s, create_time=%s, " + "retry issued: (out_id=%d)", + tag, create_time_str, ctx->ins->id); + } + else { + flb_plg_warn(ctx->ins, + "chunk tag=%s, create_time=%s cannot be retried", + tag, create_time_str); + } + } + else { + if (less_than_limit == FLB_TRUE) { + flb_plg_warn(ctx->ins, + "failed to flush chunk tag=%s, create_time=%s, " + "retry issued: input=%s > output=%s (out_id=%d)", + tag, create_time_str, input_name, ctx->ins->name, ctx->ins->id); + } + else { + flb_plg_warn(ctx->ins, + "chunk tag=%s, create_time=%s cannot be retried: " + "input=%s > output=%s", + tag, create_time_str, input_name, ctx->ins->name); + } + } +} + int s3_plugin_under_test() { if (getenv("FLB_S3_PLUGIN_UNDER_TEST") != NULL) { @@ -322,6 +361,18 @@ static int write_seq_index(char *seq_index_file, uint64_t seq_index) return 0; } +static void s3_decrement_index(struct flb_s3 *ctx) +{ + int ret; + ctx->seq_index--; + + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to save decremented $INDEX for s3 key to " + "store_dir after request error"); + } +} + static int init_seq_index(void *context) { int ret; const char *tmp; @@ -439,7 +490,6 @@ static void s3_context_destroy(struct flb_s3 *ctx) struct mk_list *tmp; struct multipart_upload *m_upload; struct upload_queue *upload_contents; - if (!ctx) { return; } @@ -527,7 +577,6 @@ static int cb_s3_init(struct flb_output_instance *ins, mk_list_init(&ctx->uploads); mk_list_init(&ctx->upload_queue); - ctx->retry_time = 0; ctx->upload_queue_success = FLB_FALSE; /* Export context */ @@ -671,11 +720,12 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { - if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) { + if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) { flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB"); return -1; } - } else { + } + else { if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) { flb_plg_error(ctx->ins, "Max upload_chunk_size is 50MB"); return -1; @@ -887,7 +937,8 @@ static int cb_s3_init(struct flb_output_instance *ins, if (ctx->insecure == FLB_TRUE) { ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port, FLB_IO_TCP, NULL); - } else { + } + else { ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port, FLB_IO_TLS, ctx->client_tls); } @@ -928,7 +979,7 @@ static int cb_s3_init(struct flb_output_instance *ins, "executions to S3; buffer=%s", ctx->fs->root_path); ctx->has_old_buffers = FLB_FALSE; - ret = put_all_chunks(ctx); + ret = put_all_chunks(ctx, FLB_TRUE); if (ret < 0) { ctx->has_old_buffers = FLB_TRUE; flb_plg_error(ctx->ins, @@ -960,8 +1011,10 @@ static int cb_s3_init(struct flb_output_instance *ins, } /* - * return value is one of FLB_OK, FLB_RETRY, FLB_ERROR - * + * return value is one of 0, -1, -2. + * 0 means uploaded successfully. + * -1 means failed to upload data, will retry. + * -2 means failed to upload data, but can't retry because reach the retry_limit. * Chunk is allowed to be NULL */ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, @@ -995,8 +1048,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); - return FLB_RETRY; - } else { + return -1; + } + else { preCompress_size = body_size; body = (void *) payload_buf; body_size = payload_size; @@ -1020,7 +1074,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (m_upload == NULL) { if (chunk != NULL && time(NULL) > - (chunk->create_time + ctx->upload_timeout + ctx->retry_time)) { + (chunk->create_time + ctx->upload_timeout)) { /* timeout already reached, just PutObject */ goto put_object; } @@ -1028,7 +1082,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, /* already big enough, just use PutObject API */ goto put_object; } - else if(body_size > MIN_CHUNKED_UPLOAD_SIZE) { + else if (body_size > MIN_CHUNKED_UPLOAD_SIZE) { init_upload = FLB_TRUE; goto multipart; } @@ -1062,17 +1116,25 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (ret < 0) { /* re-add chunk to list */ if (chunk) { - s3_store_file_unlock(chunk); chunk->failures += 1; + if (chunk->failures > ctx->ins->retry_limit){ + s3_retry_warn(ctx, tag, chunk->input_name, create_time, FLB_FALSE); + s3_store_file_delete(ctx, chunk); + return -2; + } + else { + s3_retry_warn(ctx, tag, chunk->input_name, create_time, FLB_TRUE); + s3_store_file_unlock(chunk); + return -1; + } } - return FLB_RETRY; } /* data was sent successfully- delete the local buffer */ if (chunk) { s3_store_file_delete(ctx, chunk); } - return FLB_OK; + return 0; multipart: @@ -1086,7 +1148,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } - return FLB_RETRY; + return -1; } } @@ -1100,7 +1162,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } - return FLB_RETRY; + return -1; } m_upload->upload_state = MULTIPART_UPLOAD_STATE_CREATED; } @@ -1110,22 +1172,40 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } - m_upload->upload_errors += 1; + /* re-add chunk to list */ if (chunk) { - s3_store_file_unlock(chunk); chunk->failures += 1; - } - if (ctx->key_fmt_has_seq_index) { - ctx->seq_index--; + if (chunk->failures > ctx->ins->retry_limit) { + s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, + chunk->create_time, FLB_FALSE); + s3_store_file_delete(ctx, chunk); + /* + * part_number initializes with 1, if the number still is 1 which means + * no data is uploaded and this upload file can be deleted , else set + * as complete. + */ + if (m_upload->part_number == 1) { + mk_list_del(&m_upload->_head); + multipart_upload_destroy(m_upload); - ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to decrement index after request error"); + /* Decrement the $INDEX when no data is uploaded */ + if (ctx->key_fmt_has_seq_index) { + s3_decrement_index(ctx); + } + } + else { + m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; + } + return -2; + } + else { + s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, + chunk->create_time, FLB_TRUE); + s3_store_file_unlock(chunk); return -1; } } - return FLB_RETRY; } m_upload->part_number += 1; /* data was sent successfully- delete the local buffer */ @@ -1147,7 +1227,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, "(the API limit) have been uploaded", m_upload->s3_key); } if (time(NULL) > - (m_upload->init_time + ctx->upload_timeout + ctx->retry_time)) { + (m_upload->init_time + ctx->upload_timeout)) { timeout_check = FLB_TRUE; flb_plg_info(ctx->ins, "Will complete upload for %s because upload_timeout" " has elapsed", m_upload->s3_key); @@ -1161,16 +1241,15 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; } - return FLB_OK; + return 0; } - /* * Attempts to send all chunks to S3 using PutObject * Used on shut down to try to send all buffered data * Used on start up to try to send any leftover buffers from previous executions */ -static int put_all_chunks(struct flb_s3 *ctx) +static int put_all_chunks(struct flb_s3 *ctx, int is_startup) { struct s3_file *chunk; struct mk_list *tmp; @@ -1204,12 +1283,10 @@ static int put_all_chunks(struct flb_s3 *ctx) continue; } - if (chunk->failures >= MAX_UPLOAD_ERRORS) { - flb_plg_warn(ctx->ins, - "Chunk for tag %s failed to send %i times, " - "will not retry", - (char *) fsf->meta_buf, MAX_UPLOAD_ERRORS); - flb_fstore_file_inactive(ctx->fs, fsf); + if (chunk->failures > ctx->ins->retry_limit) { + s3_retry_warn(ctx, (char *) fsf->meta_buf, + NULL, chunk->create_time, FLB_FALSE); + flb_fstore_file_delete(ctx->fs, fsf); continue; } @@ -1224,10 +1301,13 @@ static int put_all_chunks(struct flb_s3 *ctx) if (ctx->compression != FLB_AWS_COMPRESS_NONE) { /* Map payload */ - ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size); + ret = flb_aws_compression_compress(ctx->compression, buffer, + buffer_size, &payload_buf, + &payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss"); - } else { + } + else { flb_plg_info(ctx->ins, "Pre-compression chunk size is %d, After compression, chunk is %d bytes", buffer_size, payload_size); buffer = (void *) payload_buf; buffer_size = payload_size; @@ -1239,8 +1319,18 @@ static int put_all_chunks(struct flb_s3 *ctx) chunk->create_time, buffer, buffer_size); flb_free(buffer); if (ret < 0) { - s3_store_file_unlock(chunk); chunk->failures += 1; + if (is_startup == FLB_TRUE) { + s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, + chunk->create_time, FLB_TRUE); + s3_store_file_unlock(chunk); + } + else { + flb_plg_error(ctx->ins, "Failed to flush chunk tag=%s, " + "chunk will remain buffered on the filesystem " + "in the store_dir.", + (char *) fsf->meta_buf); + } return -1; } @@ -1445,13 +1535,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_ decrement_index: if (ctx->key_fmt_has_seq_index) { - ctx->seq_index--; - - ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to decrement index after request error"); - return -1; - } + s3_decrement_index(ctx); } return -1; } @@ -1476,8 +1560,7 @@ int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_siz return 0; } -static struct multipart_upload *get_upload(struct flb_s3 *ctx, - const char *tag, int tag_len) +static struct multipart_upload *get_upload(struct flb_s3 *ctx,const char *tag, int tag_len) { struct multipart_upload *m_upload = NULL; struct multipart_upload *tmp_upload = NULL; @@ -1490,12 +1573,6 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, if (tmp_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) { continue; } - if (tmp_upload->upload_errors >= MAX_UPLOAD_ERRORS) { - tmp_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; - flb_plg_error(ctx->ins, "Upload for %s has reached max upload errors", - tmp_upload->s3_key); - continue; - } if (strcmp(tmp_upload->tag, tag) == 0) { m_upload = tmp_upload; break; @@ -1572,7 +1649,6 @@ static int add_to_queue(struct flb_s3 *ctx, struct s3_file *upload_file, upload_contents->upload_file = upload_file; upload_contents->m_upload_file = m_upload_file; upload_contents->tag_len = tag_len; - upload_contents->retry_counter = 0; upload_contents->upload_time = -1; /* Necessary to create separate string for tag to prevent corruption */ @@ -1710,38 +1786,24 @@ static void s3_upload_queue(struct flb_config *config, void *out_context) goto exit; } - /* Try to upload file. Return value can be -1, FLB_OK, FLB_ERROR, FLB_RETRY. */ + /* Try to upload file. Return value can be 0, -1, -2. */ ret = send_upload_request(ctx, NULL, upload_contents->upload_file, upload_contents->m_upload_file, upload_contents->tag, upload_contents->tag_len); - if (ret < 0) { - goto exit; - } - else if (ret == FLB_OK) { + + if (ret == 0) { remove_from_queue(upload_contents); - ctx->retry_time = 0; ctx->upload_queue_success = FLB_TRUE; } else { - s3_store_file_lock(upload_contents->upload_file); ctx->upload_queue_success = FLB_FALSE; - /* If retry limit was reached, discard file and remove file from queue */ - upload_contents->retry_counter++; - if (upload_contents->retry_counter >= MAX_UPLOAD_ERRORS) { - flb_plg_warn(ctx->ins, "Chunk file failed to send %d times, will not " - "retry", upload_contents->retry_counter); - s3_store_file_inactive(ctx, upload_contents->upload_file); - multipart_upload_destroy(upload_contents->m_upload_file); + /* If retry limit was reached, remove file from queue */ + if (ret == -2) { remove_from_queue(upload_contents); continue; } - /* Retry in N seconds */ - upload_contents->upload_time = now + 2 * upload_contents->retry_counter; - ctx->retry_time += 2 * upload_contents->retry_counter; - flb_plg_debug(ctx->ins, "Failed to upload file in upload_queue. Will not " - "retry for %d seconds", 2 * upload_contents->retry_counter); break; } } @@ -1773,7 +1835,7 @@ static void cb_s3_upload(struct flb_config *config, void *data) fsf = mk_list_entry(head, struct flb_fstore_file, _head); chunk = fsf->data; - if (now < (chunk->create_time + ctx->upload_timeout + ctx->retry_time)) { + if (now < (chunk->create_time + ctx->upload_timeout)) { continue; /* Only send chunks which have timed out */ } @@ -1795,7 +1857,7 @@ static void cb_s3_upload(struct flb_config *config, void *data) ret = upload_data(ctx, chunk, m_upload, buffer, buffer_size, (const char *) fsf->meta_buf, fsf->meta_size); flb_free(buffer); - if (ret != FLB_OK) { + if (ret != 0) { flb_plg_error(ctx->ins, "Could not send chunk with tag %s", (char *) fsf->meta_buf); } @@ -1806,11 +1868,13 @@ static void cb_s3_upload(struct flb_config *config, void *data) m_upload = mk_list_entry(head, struct multipart_upload, _head); complete = FLB_FALSE; - if (m_upload->complete_errors >= MAX_UPLOAD_ERRORS) { + if (m_upload->complete_errors > ctx->ins->retry_limit) { flb_plg_error(ctx->ins, - "Upload for %s has reached max completion errors, " - "plugin will give up", m_upload->s3_key); + "Multipart Upload for %s has failed " + "s3: CompleteMultipartUpload more than configured retry_limit, " + "output will give up ", m_upload->s3_key); mk_list_del(&m_upload->_head); + multipart_upload_destroy(m_upload); continue; } @@ -1821,7 +1885,7 @@ static void cb_s3_upload(struct flb_config *config, void *data) if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) { complete = FLB_TRUE; } - if (time(NULL) > (m_upload->init_time + ctx->upload_timeout + ctx->retry_time)) { + if (time(NULL) > (m_upload->init_time + ctx->upload_timeout)) { flb_plg_info(ctx->ins, "Completing upload for %s because upload_timeout" " has passed", m_upload->s3_key); complete = FLB_TRUE; @@ -2031,7 +2095,7 @@ static void flush_init(void *out_context) "executions to S3; buffer=%s", ctx->fs->root_path); ctx->has_old_buffers = FLB_FALSE; - ret = put_all_chunks(ctx); + ret = put_all_chunks(ctx, FLB_TRUE); if (ret < 0) { ctx->has_old_buffers = FLB_TRUE; flb_plg_error(ctx->ins, @@ -2151,11 +2215,11 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, m_upload_file, file_first_log_time); } - /* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */ - if (upload_file != NULL && upload_file->failures >= MAX_UPLOAD_ERRORS) { - flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not " - "retry", event_chunk->tag, MAX_UPLOAD_ERRORS); - s3_store_file_inactive(ctx, upload_file); + /* Discard upload_file if it has failed to upload ctx->ins->retry_limit times */ + if (upload_file != NULL && upload_file->failures > ctx->ins->retry_limit) { + s3_retry_warn(ctx, event_chunk->tag, out_flush->task->i_ins->name, + upload_file->create_time, FLB_FALSE); + s3_store_file_delete(ctx, upload_file); upload_file = NULL; } @@ -2174,6 +2238,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, (m_upload_file->init_time + ctx->upload_timeout)) { upload_timeout_check = FLB_TRUE; flb_plg_info(ctx->ins, "upload_timeout reached for %s", event_chunk->tag); + m_upload_file->input_name = out_flush->task->i_ins->name; } /* If total_file_size has been reached, upload file */ @@ -2184,6 +2249,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, /* File is ready for upload, upload_file != NULL prevents from segfaulting. */ if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) { + upload_file->input_name = out_flush->task->i_ins->name; if (ctx->preserve_data_ordering == FLB_TRUE) { /* Buffer last chunk in file and lock file to prevent further changes */ ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, @@ -2216,9 +2282,9 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, event_chunk->tag, flb_sds_len(event_chunk->tag)); if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_ERROR); + FLB_OUTPUT_RETURN(FLB_RETRY); } - FLB_OUTPUT_RETURN(ret); + FLB_OUTPUT_RETURN(FLB_OK); } } @@ -2247,7 +2313,7 @@ static int cb_s3_exit(void *data, struct flb_config *config) if (s3_store_has_data(ctx) == FLB_TRUE) { flb_plg_info(ctx->ins, "Sending all locally buffered data to S3"); - ret = put_all_chunks(ctx); + ret = put_all_chunks(ctx, FLB_FALSE); if (ret < 0) { flb_plg_error(ctx->ins, "Could not send all chunks on exit"); } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 3be84823eee..6b08cc8c589 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -47,24 +47,12 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 -/* - * If we see repeated errors on an upload/chunk, we will discard it - * This saves us from scenarios where something goes wrong and an upload can - * not proceed (may be some other process completed it or deleted the upload) - * instead of erroring out forever, we eventually discard the upload. - * - * The same is done for chunks, just to be safe, even though realistically - * I can't think of a reason why a chunk could become unsendable. - */ -#define MAX_UPLOAD_ERRORS 5 - struct upload_queue { struct s3_file *upload_file; struct multipart_upload *m_upload_file; char *tag; int tag_len; - int retry_counter; time_t upload_time; struct mk_list _head; @@ -84,20 +72,16 @@ struct multipart_upload { flb_sds_t etags[10000]; int part_number; - /* - * we use async http, so we need to check that all part requests have - * completed before we complete the upload - */ - int parts_uploaded; - /* ongoing tracker of how much data has been sent for this upload */ size_t bytes; struct mk_list _head; - /* see note for MAX_UPLOAD_ERRORS */ - int upload_errors; + /* checked against user configured retry_limit */ int complete_errors; + + /* for warn message to get input name */ + char *input_name; }; struct flb_s3 { @@ -163,7 +147,6 @@ struct flb_s3 { size_t file_size; size_t upload_chunk_size; time_t upload_timeout; - time_t retry_time; int timer_created; int timer_ms; diff --git a/plugins/out_s3/s3_store.c b/plugins/out_s3/s3_store.c index 8a96406335a..6f7c8717827 100644 --- a/plugins/out_s3/s3_store.c +++ b/plugins/out_s3/s3_store.c @@ -401,18 +401,6 @@ int s3_store_has_uploads(struct flb_s3 *ctx) return FLB_FALSE; } -int s3_store_file_inactive(struct flb_s3 *ctx, struct s3_file *s3_file) -{ - int ret; - struct flb_fstore_file *fsf; - - fsf = s3_file->fsf; - flb_free(s3_file); - ret = flb_fstore_file_inactive(ctx->fs, fsf); - - return ret; -} - int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file) { struct flb_fstore_file *fsf; diff --git a/plugins/out_s3/s3_store.h b/plugins/out_s3/s3_store.h index 9caa7bdf482..ff1be7bb78c 100644 --- a/plugins/out_s3/s3_store.h +++ b/plugins/out_s3/s3_store.h @@ -26,6 +26,7 @@ struct s3_file { int locked; /* locked chunk is busy, cannot write to it */ int failures; /* delivery failures */ + char *input_name; /* for s3_retry_warn output message gets input name */ size_t size; /* file size */ time_t create_time; /* creation time */ time_t first_log_time; /* first log time */ @@ -44,7 +45,6 @@ int s3_store_exit(struct flb_s3 *ctx); int s3_store_has_data(struct flb_s3 *ctx); int s3_store_has_uploads(struct flb_s3 *ctx); -int s3_store_file_inactive(struct flb_s3 *ctx, struct s3_file *s3_file); struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag, int tag_len); int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file);