Skip to content

Commit

Permalink
out_s3: use retry_limit in fluent-bit to replace MAX_UPLOAD_ERROR and…
Browse files Browse the repository at this point in the history
… update s3 warn output messages with function s3_retry_warn()

Signed-off-by: Clay Cheng <[email protected]>
  • Loading branch information
EC2 Default User authored and Claych committed Dec 1, 2022
1 parent 760956f commit ce0f890
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 38 deletions.
148 changes: 110 additions & 38 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,44 @@ static char *mock_error_response(char *error_env_var)
return NULL;
}

static void s3_retry_warn(struct flb_s3 *ctx, char *tag,
char *input_name, time_t create_time,
int less_than_limit)
{
struct tm now_time;
char create_time_str[20];
struct tm *tmp;

tmp = localtime_r(&create_time, &now_time);
strftime(create_time_str, 20, "%Y-%m-%d %H:%M:%S", tmp);
if (input_name == NULL) {
if (less_than_limit == FLB_TRUE) {
flb_plg_warn(ctx->ins,
"failed to flush chunk tag=%s, create_time=%s"
"(out_id=%d)",
tag, create_time_str, ctx->ins->id);
}
else {
flb_plg_warn(ctx->ins,
"chunk tag=%s, create_time=%s cannot be retried",
tag, create_time_str);
}
}
else if (strlen(input_name) > 0) {
if (less_than_limit == FLB_TRUE) {
flb_plg_warn(ctx->ins,
"failed to flush chunk tag=%s, create_time=%s"
"retry issued: input=%s > output=%s (out_id=%d)",
tag, create_time_str, input_name, ctx->ins->name, ctx->ins->id);
}
else {
flb_plg_warn(ctx->ins,
"chunk tag=%s, create_time=%s cannot be retried: input=%s > output=%s",
tag, create_time_str, input_name, ctx->ins->name);
}
}
}

int s3_plugin_under_test()
{
if (getenv("FLB_S3_PLUGIN_UNDER_TEST") != NULL) {
Expand Down Expand Up @@ -438,7 +476,6 @@ static void s3_context_destroy(struct flb_s3 *ctx)
struct mk_list *tmp;
struct multipart_upload *m_upload;
struct upload_queue *upload_contents;

if (!ctx) {
return;
}
Expand Down Expand Up @@ -671,11 +708,12 @@ static int cb_s3_init(struct flb_output_instance *ins,
return -1;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) {
if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) {
flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB");
return -1;
}
} else {
}
else {
if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) {
flb_plg_error(ctx->ins, "Max upload_chunk_size is 50MB");
return -1;
Expand Down Expand Up @@ -887,7 +925,8 @@ static int cb_s3_init(struct flb_output_instance *ins,
if (ctx->insecure == FLB_TRUE) {
ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port,
FLB_IO_TCP, NULL);
} else {
}
else {
ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port,
FLB_IO_TLS, ctx->client_tls);
}
Expand Down Expand Up @@ -986,14 +1025,17 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
void *payload_buf = NULL;
size_t payload_size = 0;
size_t preCompress_size = 0;
int less_than_limit = FLB_TRUE;

if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
/* Map payload */
ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size);
ret = flb_aws_compression_compress(ctx->compression, body, body_size,
&payload_buf, &payload_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
return FLB_RETRY;
} else {
}
else {
preCompress_size = body_size;
body = (void *) payload_buf;
body_size = payload_size;
Expand Down Expand Up @@ -1025,7 +1067,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
/* already big enough, just use PutObject API */
goto put_object;
}
else if(body_size > MIN_CHUNKED_UPLOAD_SIZE) {
else if (body_size > MIN_CHUNKED_UPLOAD_SIZE) {
init_upload = FLB_TRUE;
goto multipart;
}
Expand Down Expand Up @@ -1066,10 +1108,20 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
if (ret < 0) {
/* re-add chunk to list */
if (chunk) {
s3_store_file_unlock(chunk);
chunk->failures += 1;
if (chunk->failures > ctx->ins->retry_limit){
less_than_limit = FLB_FALSE;
}
s3_retry_warn(ctx, tag, chunk->input_name, create_time, less_than_limit);
if (less_than_limit == FLB_FALSE) {
s3_store_file_unlock(chunk);
return FLB_RETRY;
}
else {
s3_store_file_delete(ctx, chunk);
return FLB_ERROR;
}
}
return FLB_RETRY;
}

/* data was sent successfully- delete the local buffer */
Expand Down Expand Up @@ -1117,8 +1169,20 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
m_upload->upload_errors += 1;
/* re-add chunk to list */
if (chunk) {
s3_store_file_unlock(chunk);
chunk->failures += 1;
if (chunk->failures > ctx->ins->retry_limit) {
less_than_limit = FLB_FALSE;
}
s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name,
chunk->create_time, less_than_limit);
if (less_than_limit == FLB_TRUE) {
s3_store_file_unlock(chunk);
return FLB_RETRY;
}
else {
s3_store_file_delete(ctx, chunk);
return FLB_ERROR;
}
}
if (ctx->key_fmt_has_seq_index) {
ctx->seq_index--;
Expand All @@ -1129,7 +1193,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
return -1;
}
}
return FLB_RETRY;
}
m_upload->part_number += 1;
/* data was sent successfully- delete the local buffer */
Expand Down Expand Up @@ -1168,7 +1231,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
return FLB_OK;
}


/*
* Attempts to send all chunks to S3 using PutObject
* Used on shut down to try to send all buffered data
Expand All @@ -1187,6 +1249,7 @@ static int put_all_chunks(struct flb_s3 *ctx)
char *buffer = NULL;
size_t buffer_size;
int ret;
int less_than_limit = FLB_TRUE;

mk_list_foreach(head, &ctx->fs->streams) {
/* skip multi upload stream */
Expand All @@ -1208,11 +1271,9 @@ static int put_all_chunks(struct flb_s3 *ctx)
continue;
}

if (chunk->failures >= MAX_UPLOAD_ERRORS) {
flb_plg_warn(ctx->ins,
"Chunk for tag %s failed to send %i times, "
"will not retry",
(char *) fsf->meta_buf, MAX_UPLOAD_ERRORS);
if (chunk->failures > ctx->ins->retry_limit) {
s3_retry_warn(ctx, (char *) fsf->meta_buf,
NULL, chunk->create_time, FLB_FALSE);
flb_fstore_file_inactive(ctx->fs, fsf);
continue;
}
Expand All @@ -1228,10 +1289,13 @@ static int put_all_chunks(struct flb_s3 *ctx)

if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
/* Map payload */
ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size);
ret = flb_aws_compression_compress(ctx->compression, buffer,
buffer_size, &payload_buf,
&payload_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss");
} else {
}
else {
flb_plg_info(ctx->ins, "Pre-compression chunk size is %d, After compression, chunk is %d bytes", buffer_size, payload_size);
buffer = (void *) payload_buf;
buffer_size = payload_size;
Expand All @@ -1243,13 +1307,23 @@ static int put_all_chunks(struct flb_s3 *ctx)
chunk->create_time, buffer, buffer_size);
flb_free(buffer);
if (ret < 0) {
s3_store_file_unlock(chunk);
chunk->failures += 1;
if (chunk->failures > ctx->ins->retry_limit){
less_than_limit = FLB_FALSE;
s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL,
chunk->create_time, less_than_limit);
s3_store_file_delete(ctx, chunk);
}
else {
s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL,
chunk->create_time, less_than_limit);
s3_store_file_unlock(chunk);
}
return -1;
}

/* data was sent successfully- delete the local buffer */
s3_store_file_delete(ctx, chunk);
s3_store_file_delete(ctx, chunk);
}
}

Expand Down Expand Up @@ -1480,8 +1554,7 @@ int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_siz
return 0;
}

static struct multipart_upload *get_upload(struct flb_s3 *ctx,
const char *tag, int tag_len)
static struct multipart_upload *get_upload(struct flb_s3 *ctx,const char *tag, int tag_len)
{
struct multipart_upload *m_upload = NULL;
struct multipart_upload *tmp_upload = NULL;
Expand All @@ -1490,14 +1563,13 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,

mk_list_foreach_safe(head, tmp, &ctx->uploads) {
tmp_upload = mk_list_entry(head, struct multipart_upload, _head);

if (tmp_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) {
continue;
}
if (tmp_upload->upload_errors >= MAX_UPLOAD_ERRORS) {
if (tmp_upload->upload_errors > ctx->ins->retry_limit) {
tmp_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
flb_plg_error(ctx->ins, "Upload for %s has reached max upload errors",
tmp_upload->s3_key);
s3_retry_warn(ctx, tmp_upload->tag, tmp_upload->input_name,
tmp_upload->init_time, FLB_FALSE);
continue;
}
if (strcmp(tmp_upload->tag, tag) == 0) {
Expand Down Expand Up @@ -1736,9 +1808,10 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)

/* If retry limit was reached, discard file and remove file from queue */
upload_contents->retry_counter++;
if (upload_contents->retry_counter >= MAX_UPLOAD_ERRORS) {
flb_plg_warn(ctx->ins, "Chunk file failed to send %d times, will not "
"retry", upload_contents->retry_counter);
if (upload_contents->retry_counter > ctx->ins->retry_limit) {
s3_retry_warn(ctx, upload_contents->tag,
upload_contents->m_upload_file->input_name,
upload_contents->upload_time, FLB_FALSE);
s3_store_file_inactive(ctx, upload_contents->upload_file);
multipart_upload_destroy(upload_contents->m_upload_file);
remove_from_queue(upload_contents);
Expand Down Expand Up @@ -1824,10 +1897,8 @@ static void cb_s3_upload(struct flb_config *config, void *data)
m_upload = mk_list_entry(head, struct multipart_upload, _head);
complete = FLB_FALSE;

if (m_upload->complete_errors >= MAX_UPLOAD_ERRORS) {
flb_plg_error(ctx->ins,
"Upload for %s has reached max completion errors, "
"plugin will give up", m_upload->s3_key);
if (m_upload->complete_errors > ctx->ins->retry_limit) {
s3_retry_warn(ctx, m_upload->tag, NULL, m_upload->init_time, FLB_FALSE);
mk_list_del(&m_upload->_head);
continue;
}
Expand Down Expand Up @@ -2139,10 +2210,9 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
chunk, chunk_size, m_upload_file);
}

/* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */
if (upload_file != NULL && upload_file->failures >= MAX_UPLOAD_ERRORS) {
flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not "
"retry", event_chunk->tag, MAX_UPLOAD_ERRORS);
/* Discard upload_file if it has failed to upload ctx->ins->retry_limit times */
if (upload_file != NULL && upload_file->failures > ctx->ins->retry_limit) {
s3_retry_warn(ctx, event_chunk->tag, out_flush->task->i_ins->name, upload_file->create_time, FLB_FALSE);
s3_store_file_inactive(ctx, upload_file);
upload_file = NULL;
}
Expand All @@ -2162,6 +2232,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
(m_upload_file->init_time + ctx->upload_timeout)) {
upload_timeout_check = FLB_TRUE;
flb_plg_info(ctx->ins, "upload_timeout reached for %s", event_chunk->tag);
m_upload_file->input_name = out_flush->task->i_ins->name;
}

/* If total_file_size has been reached, upload file */
Expand All @@ -2172,6 +2243,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,

/* File is ready for upload */
if (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE) {
upload_file->input_name = out_flush->task->i_ins->name;
if (ctx->preserve_data_ordering == FLB_TRUE) {
/* Buffer last chunk in file and lock file to prevent further changes */
ret = buffer_chunk(ctx, upload_file, chunk, chunk_size,
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ struct multipart_upload {
/* see note for MAX_UPLOAD_ERRORS */
int upload_errors;
int complete_errors;

/* for warn message to get input name */
char *input_name;
};

struct flb_s3 {
Expand Down
1 change: 1 addition & 0 deletions plugins/out_s3/s3_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
struct s3_file {
int locked; /* locked chunk is busy, cannot write to it */
int failures; /* delivery failures */
char *input_name; /* for s3_retry_warn output message gets input name */
size_t size; /* file size */
time_t create_time; /* creation time */
flb_sds_t file_path; /* file path */
Expand Down

0 comments on commit ce0f890

Please sign in to comment.