From 6c38848fba371987bf34199b718ab6f1117cd3b6 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 16 Sep 2022 20:38:50 +0000 Subject: [PATCH] out_s3: use retry_limit in fluent-bit to replace MAX_UPLOAD_ERROR and update s3 warn output messages with function s3_retry_warn() Signed-off-by: Clay Cheng --- plugins/out_s3/s3.c | 148 ++++++++++++++++++++++++++++---------- plugins/out_s3/s3.h | 3 + plugins/out_s3/s3_store.h | 1 + 3 files changed, 114 insertions(+), 38 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 5f4c1027852..244dca01fce 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -118,6 +118,44 @@ static char *mock_error_response(char *error_env_var) return NULL; } +static void s3_retry_warn(struct flb_s3 *ctx, char *tag, + char *input_name, time_t create_time, + int less_than_limit) +{ + struct tm now_time; + char create_time_str[20]; + struct tm *tmp; + + tmp = localtime_r(&create_time, &now_time); + strftime(create_time_str, 20, "%Y-%m-%d %H:%M:%S", tmp); + if (input_name == NULL) { + if (less_than_limit == FLB_TRUE) { + flb_plg_warn(ctx->ins, + "failed to flush chunk tag=%s, create_time=%s" + "(out_id=%d)", + tag, create_time_str, ctx->ins->id); + } + else { + flb_plg_warn(ctx->ins, + "chunk tag=%s, create_time=%s cannot be retried", + tag, create_time_str); + } + } + else if (strlen(input_name) > 0) { + if (less_than_limit == FLB_TRUE) { + flb_plg_warn(ctx->ins, + "failed to flush chunk tag=%s, create_time=%s" + "retry issued: input=%s > output=%s (out_id=%d)", + tag, create_time_str, input_name, ctx->ins->name, ctx->ins->id); + } + else { + flb_plg_warn(ctx->ins, + "chunk tag=%s, create_time=%s cannot be retried: input=%s > output=%s", + tag, create_time_str, input_name, ctx->ins->name); + } + } +} + int s3_plugin_under_test() { if (getenv("FLB_S3_PLUGIN_UNDER_TEST") != NULL) { @@ -438,7 +476,6 @@ static void s3_context_destroy(struct flb_s3 *ctx) struct mk_list *tmp; struct multipart_upload *m_upload; struct upload_queue *upload_contents; - if (!ctx) { return; } @@ -671,11 +708,12 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { - if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) { + if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) { flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB"); return -1; } - } else { + } + else { if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) { flb_plg_error(ctx->ins, "Max upload_chunk_size is 50MB"); return -1; @@ -887,7 +925,8 @@ static int cb_s3_init(struct flb_output_instance *ins, if (ctx->insecure == FLB_TRUE) { ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port, FLB_IO_TCP, NULL); - } else { + } + else { ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port, FLB_IO_TLS, ctx->client_tls); } @@ -986,14 +1025,17 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, void *payload_buf = NULL; size_t payload_size = 0; size_t preCompress_size = 0; + int less_than_limit = FLB_TRUE; if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { /* Map payload */ - ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); + ret = flb_aws_compression_compress(ctx->compression, body, body_size, + &payload_buf, &payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); return FLB_RETRY; - } else { + } + else { preCompress_size = body_size; body = (void *) payload_buf; body_size = payload_size; @@ -1025,7 +1067,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, /* already big enough, just use PutObject API */ goto put_object; } - else if(body_size > MIN_CHUNKED_UPLOAD_SIZE) { + else if (body_size > MIN_CHUNKED_UPLOAD_SIZE) { init_upload = FLB_TRUE; goto multipart; } @@ -1066,10 +1108,20 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (ret < 0) { /* re-add chunk to list */ if (chunk) { - s3_store_file_unlock(chunk); chunk->failures += 1; + if (chunk->failures > ctx->ins->retry_limit){ + less_than_limit = FLB_FALSE; + } + s3_retry_warn(ctx, tag, chunk->input_name, create_time, less_than_limit); + if (less_than_limit == FLB_FALSE) { + s3_store_file_unlock(chunk); + return FLB_RETRY; + } + else { + s3_store_file_delete(ctx, chunk); + return FLB_ERROR; + } } - return FLB_RETRY; } /* data was sent successfully- delete the local buffer */ @@ -1117,8 +1169,20 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, m_upload->upload_errors += 1; /* re-add chunk to list */ if (chunk) { - s3_store_file_unlock(chunk); chunk->failures += 1; + if (chunk->failures > ctx->ins->retry_limit) { + less_than_limit = FLB_FALSE; + } + s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, + chunk->create_time, less_than_limit); + if (less_than_limit == FLB_TRUE) { + s3_store_file_unlock(chunk); + return FLB_RETRY; + } + else { + s3_store_file_delete(ctx, chunk); + return FLB_ERROR; + } } if (ctx->key_fmt_has_seq_index) { ctx->seq_index--; @@ -1129,7 +1193,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, return -1; } } - return FLB_RETRY; } m_upload->part_number += 1; /* data was sent successfully- delete the local buffer */ @@ -1168,7 +1231,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, return FLB_OK; } - /* * Attempts to send all chunks to S3 using PutObject * Used on shut down to try to send all buffered data @@ -1187,6 +1249,7 @@ static int put_all_chunks(struct flb_s3 *ctx) char *buffer = NULL; size_t buffer_size; int ret; + int less_than_limit = FLB_TRUE; mk_list_foreach(head, &ctx->fs->streams) { /* skip multi upload stream */ @@ -1208,11 +1271,9 @@ static int put_all_chunks(struct flb_s3 *ctx) continue; } - if (chunk->failures >= MAX_UPLOAD_ERRORS) { - flb_plg_warn(ctx->ins, - "Chunk for tag %s failed to send %i times, " - "will not retry", - (char *) fsf->meta_buf, MAX_UPLOAD_ERRORS); + if (chunk->failures > ctx->ins->retry_limit) { + s3_retry_warn(ctx, (char *) fsf->meta_buf, + NULL, chunk->create_time, FLB_FALSE); flb_fstore_file_inactive(ctx->fs, fsf); continue; } @@ -1228,10 +1289,13 @@ static int put_all_chunks(struct flb_s3 *ctx) if (ctx->compression != FLB_AWS_COMPRESS_NONE) { /* Map payload */ - ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size); + ret = flb_aws_compression_compress(ctx->compression, buffer, + buffer_size, &payload_buf, + &payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss"); - } else { + } + else { flb_plg_info(ctx->ins, "Pre-compression chunk size is %d, After compression, chunk is %d bytes", buffer_size, payload_size); buffer = (void *) payload_buf; buffer_size = payload_size; @@ -1243,13 +1307,23 @@ static int put_all_chunks(struct flb_s3 *ctx) chunk->create_time, buffer, buffer_size); flb_free(buffer); if (ret < 0) { - s3_store_file_unlock(chunk); chunk->failures += 1; + if (chunk->failures > ctx->ins->retry_limit){ + less_than_limit = FLB_FALSE; + s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, + chunk->create_time, less_than_limit); + s3_store_file_delete(ctx, chunk); + } + else { + s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, + chunk->create_time, less_than_limit); + s3_store_file_unlock(chunk); + } return -1; } /* data was sent successfully- delete the local buffer */ - s3_store_file_delete(ctx, chunk); + s3_store_file_delete(ctx, chunk); } } @@ -1480,8 +1554,7 @@ int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_siz return 0; } -static struct multipart_upload *get_upload(struct flb_s3 *ctx, - const char *tag, int tag_len) +static struct multipart_upload *get_upload(struct flb_s3 *ctx,const char *tag, int tag_len) { struct multipart_upload *m_upload = NULL; struct multipart_upload *tmp_upload = NULL; @@ -1490,14 +1563,13 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, mk_list_foreach_safe(head, tmp, &ctx->uploads) { tmp_upload = mk_list_entry(head, struct multipart_upload, _head); - if (tmp_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) { continue; } - if (tmp_upload->upload_errors >= MAX_UPLOAD_ERRORS) { + if (tmp_upload->upload_errors > ctx->ins->retry_limit) { tmp_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; - flb_plg_error(ctx->ins, "Upload for %s has reached max upload errors", - tmp_upload->s3_key); + s3_retry_warn(ctx, tmp_upload->tag, tmp_upload->input_name, + tmp_upload->init_time, FLB_FALSE); continue; } if (strcmp(tmp_upload->tag, tag) == 0) { @@ -1736,9 +1808,10 @@ static void s3_upload_queue(struct flb_config *config, void *out_context) /* If retry limit was reached, discard file and remove file from queue */ upload_contents->retry_counter++; - if (upload_contents->retry_counter >= MAX_UPLOAD_ERRORS) { - flb_plg_warn(ctx->ins, "Chunk file failed to send %d times, will not " - "retry", upload_contents->retry_counter); + if (upload_contents->retry_counter > ctx->ins->retry_limit) { + s3_retry_warn(ctx, upload_contents->tag, + upload_contents->m_upload_file->input_name, + upload_contents->upload_time, FLB_FALSE); s3_store_file_inactive(ctx, upload_contents->upload_file); multipart_upload_destroy(upload_contents->m_upload_file); remove_from_queue(upload_contents); @@ -1824,10 +1897,8 @@ static void cb_s3_upload(struct flb_config *config, void *data) m_upload = mk_list_entry(head, struct multipart_upload, _head); complete = FLB_FALSE; - if (m_upload->complete_errors >= MAX_UPLOAD_ERRORS) { - flb_plg_error(ctx->ins, - "Upload for %s has reached max completion errors, " - "plugin will give up", m_upload->s3_key); + if (m_upload->complete_errors > ctx->ins->retry_limit) { + s3_retry_warn(ctx, m_upload->tag, NULL, m_upload->init_time, FLB_FALSE); mk_list_del(&m_upload->_head); continue; } @@ -2139,10 +2210,9 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, chunk, chunk_size, m_upload_file); } - /* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */ - if (upload_file != NULL && upload_file->failures >= MAX_UPLOAD_ERRORS) { - flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not " - "retry", event_chunk->tag, MAX_UPLOAD_ERRORS); + /* Discard upload_file if it has failed to upload ctx->ins->retry_limit times */ + if (upload_file != NULL && upload_file->failures > ctx->ins->retry_limit) { + s3_retry_warn(ctx, event_chunk->tag, out_flush->task->i_ins->name, upload_file->create_time, FLB_FALSE); s3_store_file_inactive(ctx, upload_file); upload_file = NULL; } @@ -2162,6 +2232,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, (m_upload_file->init_time + ctx->upload_timeout)) { upload_timeout_check = FLB_TRUE; flb_plg_info(ctx->ins, "upload_timeout reached for %s", event_chunk->tag); + m_upload_file->input_name = out_flush->task->i_ins->name; } /* If total_file_size has been reached, upload file */ @@ -2172,6 +2243,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, /* File is ready for upload */ if (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE) { + upload_file->input_name = out_flush->task->i_ins->name; if (ctx->preserve_data_ordering == FLB_TRUE) { /* Buffer last chunk in file and lock file to prevent further changes */ ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 3be84823eee..c0b38314d73 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -98,6 +98,9 @@ struct multipart_upload { /* see note for MAX_UPLOAD_ERRORS */ int upload_errors; int complete_errors; + + /* for warn message to get input name */ + char *input_name; }; struct flb_s3 { diff --git a/plugins/out_s3/s3_store.h b/plugins/out_s3/s3_store.h index 242d99ab6fc..9463b91892e 100644 --- a/plugins/out_s3/s3_store.h +++ b/plugins/out_s3/s3_store.h @@ -26,6 +26,7 @@ struct s3_file { int locked; /* locked chunk is busy, cannot write to it */ int failures; /* delivery failures */ + char *input_name; /* for s3_retry_warn output message gets input name */ size_t size; /* file size */ time_t create_time; /* creation time */ flb_sds_t file_path; /* file path */