diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 36db7a4a4ec..7d47410fdd2 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -18,9 +18,12 @@ */ #include +#include +#include #include #include #include +#include #include #include #include @@ -45,10 +48,10 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, struct s3_file *chunk, char **out_buf, size_t *out_size); -static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time, +static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, char *body, size_t body_size); -static int put_all_chunks(struct flb_s3 *ctx); +static int put_all_chunks(struct flb_s3 *ctx, int is_startup); static void cb_s3_upload(struct flb_config *ctx, void *data); @@ -56,9 +59,10 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, const char *tag, int tag_len); static struct multipart_upload *create_upload(struct flb_s3 *ctx, - const char *tag, int tag_len); + const char *tag, int tag_len, + time_t file_first_log_time, + char *input_name); -static void remove_from_queue(struct upload_queue *entry); static struct flb_aws_header content_encoding_header = { .key = "Content-Encoding", @@ -95,6 +99,45 @@ static struct flb_aws_header storage_class_header = { .val_len = 0, }; +static void s3_retry_warn(struct flb_s3 *ctx, const char *tag, + char *input_name, time_t create_time, + int less_than_limit) +{ + struct tm now_time; + char create_time_str[20]; + struct tm *tmp; + + tmp = localtime_r(&create_time, &now_time); + strftime(create_time_str, 20, "%Y-%m-%d %H:%M:%S", tmp); + if (input_name == NULL || strlen(input_name) == 0) { + if (less_than_limit == FLB_TRUE) { + flb_plg_warn(ctx->ins, + "failed to flush chunk tag=%s, create_time=%s, " + "retry issued: (out_id=%d)", + tag, create_time_str, ctx->ins->id); + } + else { + flb_plg_warn(ctx->ins, + "chunk tag=%s, create_time=%s cannot be retried", + tag, create_time_str); + } + } + else { + if (less_than_limit == FLB_TRUE) { + flb_plg_warn(ctx->ins, + "failed to flush chunk tag=%s, create_time=%s, " + "retry issued: input=%s > output=%s (out_id=%d)", + tag, create_time_str, input_name, ctx->ins->name, ctx->ins->id); + } + else { + flb_plg_warn(ctx->ins, + "chunk tag=%s, create_time=%s cannot be retried: " + "input=%s > output=%s", + tag, create_time_str, input_name, ctx->ins->name); + } + } +} + static char *mock_error_response(char *error_env_var) { char *err_val = NULL; @@ -321,6 +364,18 @@ static int write_seq_index(char *seq_index_file, uint64_t seq_index) return 0; } +static void s3_decrement_index(struct flb_s3 *ctx) +{ + int ret; + ctx->seq_index--; + + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to save decremented $INDEX for s3 key to " + "store_dir after request error"); + } +} + static int init_seq_index(void *context) { int ret; const char *tmp; @@ -437,8 +492,6 @@ static void s3_context_destroy(struct flb_s3 *ctx) struct mk_list *head; struct mk_list *tmp; struct multipart_upload *m_upload; - struct upload_queue *upload_contents; - if (!ctx) { return; } @@ -490,13 +543,6 @@ static void s3_context_destroy(struct flb_s3 *ctx) multipart_upload_destroy(m_upload); } - mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { - upload_contents = mk_list_entry(head, struct upload_queue, _head); - s3_store_file_delete(ctx, upload_contents->upload_file); - multipart_upload_destroy(upload_contents->m_upload_file); - remove_from_queue(upload_contents); - } - flb_free(ctx); } @@ -527,8 +573,6 @@ static int cb_s3_init(struct flb_output_instance *ins, mk_list_init(&ctx->uploads); mk_list_init(&ctx->upload_queue); - ctx->retry_time = 0; - ctx->upload_queue_success = FLB_FALSE; /* Export context */ flb_output_set_context(ins, ctx); @@ -924,7 +968,7 @@ static int cb_s3_init(struct flb_output_instance *ins, "executions to S3; buffer=%s", ctx->fs->root_path); ctx->has_old_buffers = FLB_FALSE; - ret = put_all_chunks(ctx); + ret = put_all_chunks(ctx, FLB_TRUE); if (ret < 0) { ctx->has_old_buffers = FLB_TRUE; flb_plg_error(ctx->ins, @@ -949,16 +993,9 @@ static int cb_s3_init(struct flb_output_instance *ins, cb_s3_upload(config, ctx); } - if (ctx->use_put_object == FLB_TRUE) { - /* - * Run S3 in async mode. - * Multipart uploads don't work with async mode right now in high throughput - * cases. Its not clear why. Realistically, the performance of sync mode - * will be sufficient for most users, and long term we can do the work - * to enable async if needed. - */ - ctx->s3_client->upstream->flags = async_flags; - } + + /* S3 can run in async mode with daemon coro */ + ctx->s3_client->upstream->flags = async_flags; /* this is done last since in the previous block we make calls to AWS */ ctx->provider->provider_vtable->upstream_set(ctx->provider, ctx->ins); @@ -981,11 +1018,23 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, int size_check = FLB_FALSE; int part_num_check = FLB_FALSE; int timeout_check = FLB_FALSE; - time_t create_time; int ret; void *payload_buf = NULL; size_t payload_size = 0; size_t preCompress_size = 0; + time_t file_first_log_time = time(NULL); + char* input_name = NULL; + + /* + * When chunk does not exist, file_first_log_time will be the current time. + * This is only for unit tests and prevents unit tests from segfaulting when chunk is + * NULL because if so chunk->first_log_time will be NULl either and will cause + * segfault during the process of put_object upload or mutipart upload. + */ + if (chunk != NULL) { + file_first_log_time = chunk->first_log_time; + input_name = chunk->input_name; + } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { /* Map payload */ @@ -1048,45 +1097,34 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, put_object: - /* - * remove chunk from buffer list- needed for async http so that the - * same chunk won't be sent more than once - */ - if (chunk) { - create_time = chunk->create_time; - } - else { - create_time = time(NULL); - } - - ret = s3_put_object(ctx, tag, create_time, body, body_size); + ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size); if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } if (ret < 0) { - /* re-add chunk to list */ if (chunk) { s3_store_file_unlock(chunk); chunk->failures += 1; + if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit){ + s3_retry_warn(ctx, tag, input_name, file_first_log_time, FLB_FALSE); + return -2; + } + else { + s3_retry_warn(ctx, tag, input_name, file_first_log_time, FLB_TRUE); + return -1; + } } return FLB_RETRY; } - /* data was sent successfully- delete the local buffer */ - if (chunk) { - s3_store_file_delete(ctx, chunk); - } - return FLB_OK; + return 0; multipart: if (init_upload == FLB_TRUE) { - m_upload = create_upload(ctx, tag, tag_len); + m_upload = create_upload(ctx, tag, tag_len, file_first_log_time, input_name); if (!m_upload) { flb_plg_error(ctx->ins, "Could not find or create upload for tag %s", tag); - if (chunk) { - s3_store_file_unlock(chunk); - } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1098,9 +1136,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = create_multipart_upload(ctx, m_upload); if (ret < 0) { flb_plg_error(ctx->ins, "Could not initiate multipart upload"); - if (chunk) { - s3_store_file_unlock(chunk); - } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1114,29 +1149,41 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } - m_upload->upload_errors += 1; - /* re-add chunk to list */ if (chunk) { s3_store_file_unlock(chunk); chunk->failures += 1; - } - if (ctx->key_fmt_has_seq_index) { - ctx->seq_index--; + if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit) { + s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, + chunk->create_time, FLB_FALSE); + /* + * part_number initializes with 1, if the number still is 1 which means + * no data is uploaded and this upload file can be deleted , else set + * as complete. + */ + if (m_upload->part_number == 1) { + mk_list_del(&m_upload->_head); + multipart_upload_destroy(m_upload); - ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to decrement index after request error"); + /* Decrement the $INDEX when no data is uploaded */ + if (ctx->key_fmt_has_seq_index) { + s3_decrement_index(ctx); + } + } + else { + m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; + } + return -2; + } + else { + s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, + chunk->create_time, FLB_TRUE); return -1; } } return FLB_RETRY; } + m_upload->part_number += 1; - /* data was sent successfully- delete the local buffer */ - if (chunk) { - s3_store_file_delete(ctx, chunk); - chunk = NULL; - } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1174,7 +1221,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, * Used on shut down to try to send all buffered data * Used on start up to try to send any leftover buffers from previous executions */ -static int put_all_chunks(struct flb_s3 *ctx) +static int put_all_chunks(struct flb_s3 *ctx, int is_startup) { struct s3_file *chunk; struct mk_list *tmp; @@ -1194,29 +1241,22 @@ static int put_all_chunks(struct flb_s3 *ctx) if (fs_stream == ctx->stream_upload) { continue; } + /* skip metadata stream */ if (fs_stream == ctx->stream_metadata) { continue; } + /* on startup, we only send old chunks in this routine */ + if (is_startup == FLB_TRUE && fs_stream == ctx->stream_active) { + flb_info("put_all_chunks: stream_active has %d chunks", mk_list_size(&fs_stream->files)); + continue; + } + mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); chunk = fsf->data; - /* Locked chunks are being processed, skip */ - if (chunk->locked == FLB_TRUE) { - continue; - } - - if (chunk->failures >= MAX_UPLOAD_ERRORS) { - flb_plg_warn(ctx->ins, - "Chunk for tag %s failed to send %i times, " - "will not retry", - (char *) fsf->meta_buf, MAX_UPLOAD_ERRORS); - flb_fstore_file_inactive(ctx->fs, fsf); - continue; - } - ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); if (ret < 0) { @@ -1245,10 +1285,41 @@ static int put_all_chunks(struct flb_s3 *ctx) if (ret < 0) { s3_store_file_unlock(chunk); chunk->failures += 1; + if (is_startup == FLB_TRUE) { + if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit){ + s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, + chunk->create_time, FLB_FALSE); + if (chunk->locked == FLB_TRUE) { + /* remove from upload_queue */ + if (chunk->_head.next != NULL && chunk->_head.prev != NULL) { + mk_list_del(&chunk->_head); + } + } + s3_store_file_delete(ctx, chunk); + return -1; + } + else { + s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, + chunk->create_time, FLB_TRUE); + return -1; + } + } + else { + flb_plg_error(ctx->ins, "Failed to flush chunk tag=%s, " + "chunk will remain buffered on the filesystem " + "in the store_dir.", + (char *) fsf->meta_buf); + } return -1; } /* data was sent successfully- delete the local buffer */ + if (chunk->locked == FLB_TRUE) { + /* remove from upload_queue */ + if (chunk->_head.next != NULL && chunk->_head.prev != NULL) { + mk_list_del(&chunk->_head); + } + } s3_store_file_delete(ctx, chunk); } } @@ -1284,11 +1355,6 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, return -1; } - /* - * lock the chunk from buffer list- needed for async http so that the - * same chunk won't be sent more than once. - */ - s3_store_file_lock(chunk); body = buffered_data; body_size = buffer_size; } @@ -1304,9 +1370,6 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, if (!tmp) { flb_errno(); flb_free(buffered_data); - if (chunk) { - s3_store_file_unlock(chunk); - } return -1; } body = buffered_data = tmp; @@ -1320,7 +1383,7 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, return 0; } -static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time, +static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, char *body, size_t body_size) { flb_sds_t s3_key = NULL; @@ -1337,8 +1400,8 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_sds_t tmp; char final_body_md5[25]; - s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters, - ctx->seq_index); + s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag, + ctx->tag_delimiters, ctx->seq_index); if (!s3_key) { flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag); return -1; @@ -1449,13 +1512,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time decrement_index: if (ctx->key_fmt_has_seq_index) { - ctx->seq_index--; - - ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to decrement index after request error"); - return -1; - } + s3_decrement_index(ctx); } return -1; } @@ -1494,12 +1551,6 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, if (tmp_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) { continue; } - if (tmp_upload->upload_errors >= MAX_UPLOAD_ERRORS) { - tmp_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; - flb_plg_error(ctx->ins, "Upload for %s has reached max upload errors", - tmp_upload->s3_key); - continue; - } if (strcmp(tmp_upload->tag, tag) == 0) { m_upload = tmp_upload; break; @@ -1509,8 +1560,9 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, return m_upload; } -static struct multipart_upload *create_upload(struct flb_s3 *ctx, - const char *tag, int tag_len) +static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag, + int tag_len, time_t file_first_log_time, + char *input_name) { int ret; struct multipart_upload *m_upload = NULL; @@ -1523,8 +1575,8 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, flb_errno(); return NULL; } - s3_key = flb_get_s3_key(ctx->s3_key_format, time(NULL), tag, ctx->tag_delimiters, - ctx->seq_index); + s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag, + ctx->tag_delimiters, ctx->seq_index); if (!s3_key) { flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag); flb_free(m_upload); @@ -1541,6 +1593,9 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, m_upload->upload_state = MULTIPART_UPLOAD_STATE_NOT_CREATED; m_upload->part_number = 1; m_upload->init_time = time(NULL); + if (input_name != NULL) { + m_upload->input_name = input_name; + } mk_list_add(&m_upload->_head, &ctx->uploads); /* Update file and increment index value right before request */ @@ -1559,117 +1614,17 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, return m_upload; } -/* Adds an entry to upload queue */ -static int add_to_queue(struct flb_s3 *ctx, struct s3_file *upload_file, - struct multipart_upload *m_upload_file, const char *tag, int tag_len) -{ - struct upload_queue *upload_contents; - flb_sds_t tag_cpy; - - /* Create upload contents object and add to upload queue */ - upload_contents = flb_malloc(sizeof(struct upload_queue)); - if (upload_contents == NULL) { - flb_plg_error(ctx->ins, "Error allocating memory for upload_queue entry"); - flb_errno(); - return -1; - } - upload_contents->upload_file = upload_file; - upload_contents->m_upload_file = m_upload_file; - upload_contents->tag_len = tag_len; - upload_contents->retry_counter = 0; - upload_contents->upload_time = -1; - - /* Necessary to create separate string for tag to prevent corruption */ - tag_cpy = flb_sds_create_len(tag, tag_len); - if (!tag_cpy) { - flb_errno(); - flb_free(upload_contents); - return -1; - } - upload_contents->tag = tag_cpy; - - - /* Add entry to upload queue */ - mk_list_add(&upload_contents->_head, &ctx->upload_queue); - return 0; -} - -/* Removes an entry from upload_queue */ -void remove_from_queue(struct upload_queue *entry) -{ - mk_list_del(&entry->_head); - flb_sds_destroy(entry->tag); - flb_free(entry); - return; -} - -/* Validity check for upload queue object */ -static int upload_queue_valid(struct upload_queue *upload_contents, time_t now, - void *out_context) -{ - struct flb_s3 *ctx = out_context; - - if (upload_contents == NULL) { - flb_plg_error(ctx->ins, "Error getting entry from upload_queue"); - return -1; - } - if (upload_contents->_head.next == NULL || upload_contents->_head.prev == NULL) { - flb_plg_debug(ctx->ins, "Encountered previously deleted entry in " - "upload_queue. Deleting invalid entry"); - mk_list_del(&upload_contents->_head); - return -1; - } - if (upload_contents->upload_file->locked == FLB_FALSE) { - flb_plg_debug(ctx->ins, "Encountered unlocked file in upload_queue. " - "Exiting"); - return -1; - } - if (upload_contents->upload_file->size <= 0) { - flb_plg_debug(ctx->ins, "Encountered empty chunk file in upload_queue. " - "Deleting empty chunk file"); - remove_from_queue(upload_contents); - return -1; - } - if (now < upload_contents->upload_time) { - flb_plg_debug(ctx->ins, "Found valid chunk file but not ready to upload"); - return -1; - } - return 0; -} - -static int send_upload_request(void *out_context, flb_sds_t chunk, - struct s3_file *upload_file, - struct multipart_upload *m_upload_file, - const char *tag, int tag_len) +static int buffer_chunk(void *out_context, struct s3_file *upload_file, + flb_sds_t chunk, int chunk_size, + const char *tag, int tag_len, + time_t file_first_log_time, + char *input_name) { int ret; - char *buffer; - size_t buffer_size; struct flb_s3 *ctx = out_context; - /* Create buffer to upload to S3 */ - ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size); - flb_sds_destroy(chunk); - if (ret < 0) { - flb_plg_error(ctx->ins, "Could not construct request buffer for %s", - upload_file->file_path); - return -1; - } - - /* Upload to S3 */ - ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len); - flb_free(buffer); - - return ret; -} - -static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_t chunk, - int chunk_size, const char *tag, int tag_len) -{ - int ret; - struct flb_s3 *ctx = out_context; - - ret = s3_store_buffer_put(ctx, upload_file, tag, tag_len, chunk, (size_t) chunk_size); + ret = s3_store_buffer_put(ctx, upload_file, tag, + tag_len, chunk, (size_t) chunk_size, file_first_log_time, input_name); flb_sds_destroy(chunk); if (ret < 0) { flb_plg_warn(ctx->ins, "Could not buffer chunk. Data order preservation " @@ -1679,86 +1634,6 @@ static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_ return 0; } -/* Uploads all chunk files in queue synchronously */ -static void s3_upload_queue(struct flb_config *config, void *out_context) -{ - int ret; - int async_flags; - time_t now; - struct upload_queue *upload_contents; - struct flb_s3 *ctx = out_context; - struct mk_list *tmp; - struct mk_list *head; - - flb_plg_debug(ctx->ins, "Running upload timer callback (upload_queue).."); - - /* No chunks in upload queue. Scan for timed out chunks. */ - if (mk_list_size(&ctx->upload_queue) == 0) { - flb_plg_debug(ctx->ins, "No files found in upload_queue. Scanning for timed " - "out chunks"); - cb_s3_upload(config, out_context); - } - - /* upload timer must use sync mode */ - if (ctx->use_put_object == FLB_TRUE) { - async_flags = ctx->s3_client->upstream->flags; - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - } - - /* Iterate through each file in upload queue */ - mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { - upload_contents = mk_list_entry(head, struct upload_queue, _head); - - now = time(NULL); - - /* Checks if upload_contents is valid */ - ret = upload_queue_valid(upload_contents, now, ctx); - if (ret < 0) { - goto exit; - } - - /* Try to upload file. Return value can be -1, FLB_OK, FLB_ERROR, FLB_RETRY. */ - ret = send_upload_request(ctx, NULL, upload_contents->upload_file, - upload_contents->m_upload_file, - upload_contents->tag, upload_contents->tag_len); - if (ret < 0) { - goto exit; - } - else if (ret == FLB_OK) { - remove_from_queue(upload_contents); - ctx->retry_time = 0; - ctx->upload_queue_success = FLB_TRUE; - } - else { - s3_store_file_lock(upload_contents->upload_file); - ctx->upload_queue_success = FLB_FALSE; - - /* If retry limit was reached, discard file and remove file from queue */ - upload_contents->retry_counter++; - if (upload_contents->retry_counter >= MAX_UPLOAD_ERRORS) { - flb_plg_warn(ctx->ins, "Chunk file failed to send %d times, will not " - "retry", upload_contents->retry_counter); - s3_store_file_inactive(ctx, upload_contents->upload_file); - multipart_upload_destroy(upload_contents->m_upload_file); - remove_from_queue(upload_contents); - continue; - } - - /* Retry in N seconds */ - upload_contents->upload_time = now + 2 * upload_contents->retry_counter; - ctx->retry_time += 2 * upload_contents->retry_counter; - flb_plg_debug(ctx->ins, "Failed to upload file in upload_queue. Will not " - "retry for %d seconds", 2 * upload_contents->retry_counter); - break; - } - } - -exit: - /* re-enable async mode */ - if (ctx->use_put_object == FLB_TRUE) { - ctx->s3_client->upstream->flags = async_flags; - } -} static void cb_s3_upload(struct flb_config *config, void *data) { @@ -1773,48 +1648,63 @@ static void cb_s3_upload(struct flb_config *config, void *data) int complete; int ret; time_t now; - int async_flags; - - flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload).."); - - /* upload timer must use sync mode */ - if (ctx->use_put_object == FLB_TRUE) { - async_flags = ctx->s3_client->upstream->flags; - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - } now = time(NULL); + + flb_plg_debug(ctx->ins, "Running upload daemon coro uploader (cb_s3_upload).."); - /* Check all chunks and see if any have timed out */ + /* check chunks in active stream not marked as ready to be sent and see if any are timed out */ mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { fsf = mk_list_entry(head, struct flb_fstore_file, _head); chunk = fsf->data; - if (now < (chunk->create_time + ctx->upload_timeout + ctx->retry_time)) { - continue; /* Only send chunks which have timed out */ - } - - /* Locked chunks are being processed, skip */ + /* Locked chunks are already in the queue, skip */ if (chunk->locked == FLB_TRUE) { continue; } + if (now > (chunk->create_time + ctx->upload_timeout)) { + /* add to upload queue */ + if (chunk->input_name) { + flb_plg_info(ctx->ins, "upload_timeout reached for chunk from %s", + chunk->input_name); + } + s3_store_file_lock(chunk); + mk_list_add(&chunk->_head, &ctx->upload_queue); + } + } + + /* send any chunks that are ready */ + mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { + chunk = mk_list_entry(head, struct s3_file, _head); + fsf = chunk->fsf; + m_upload = get_upload(ctx, (const char *) fsf->meta_buf, fsf->meta_size); ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); if (ret < 0) { flb_plg_error(ctx->ins, "Could not construct request buffer for %s", chunk->file_path); - continue; + if (ctx->preserve_data_ordering == FLB_TRUE) { + break; /* if preserve_data_ordering send in the queue order, do not skip over chunks */ + } else { + continue; + } } - /* FYI: if construct_request_buffer() succeedeed, the s3_file is locked */ ret = upload_data(ctx, chunk, m_upload, buffer, buffer_size, (const char *) fsf->meta_buf, fsf->meta_size); flb_free(buffer); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "Could not send chunk with tag %s", - (char *) fsf->meta_buf); + if (ret == -2 || ret == 0) { + /* if we succeeded or retries expired, delete chunk file and remove from queue */ + mk_list_del(&chunk->_head); + s3_store_file_delete(ctx, chunk); + } + + if (ret < 0) { + if (ctx->preserve_data_ordering == FLB_TRUE) { + break; /* if preserve_data_ordering send in the queue order, do not skip over chunks */ + } } } @@ -1823,10 +1713,11 @@ static void cb_s3_upload(struct flb_config *config, void *data) m_upload = mk_list_entry(head, struct multipart_upload, _head); complete = FLB_FALSE; - if (m_upload->complete_errors >= MAX_UPLOAD_ERRORS) { + if (ctx->ins->retry_limit >= 0 && m_upload->complete_errors > ctx->ins->retry_limit) { flb_plg_error(ctx->ins, - "Upload for %s has reached max completion errors, " - "plugin will give up", m_upload->s3_key); + "Multipart Upload for %s has failed " + "s3:CompleteMultipartUpload more than configured retry_limit, " + "output will give up ", m_upload->s3_key); mk_list_del(&m_upload->_head); continue; } @@ -1859,10 +1750,6 @@ static void cb_s3_upload(struct flb_config *config, void *data) } } } - - if (ctx->use_put_object == FLB_TRUE) { - ctx->s3_client->upstream->flags = async_flags; - } } static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char *data, @@ -2016,14 +1903,17 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char static void unit_test_flush(void *out_context, struct s3_file *upload_file, const char *tag, int tag_len, flb_sds_t chunk, - int chunk_size, struct multipart_upload *m_upload_file) + int chunk_size, struct multipart_upload *m_upload_file, + time_t file_first_log_time, + char *input_name) { int ret; char *buffer; size_t buffer_size; struct flb_s3 *ctx = out_context; - s3_store_buffer_put(ctx, upload_file, tag, tag_len, chunk, (size_t) chunk_size); + s3_store_buffer_put(ctx, upload_file, tag, tag_len, + chunk, (size_t) chunk_size, file_first_log_time, input_name); ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size); if (ret < 0) { flb_plg_error(ctx->ins, "Could not construct request buffer for %s", @@ -2037,11 +1927,9 @@ static void unit_test_flush(void *out_context, struct s3_file *upload_file, FLB_OUTPUT_RETURN(ret); } -static void flush_init(void *out_context) +static void flush_startup_chunks(struct flb_s3 *ctx) { int ret; - struct flb_s3 *ctx = out_context; - struct flb_sched *sched; /* clean up any old buffers found on startup */ if (ctx->has_old_buffers == FLB_TRUE) { @@ -2050,42 +1938,132 @@ static void flush_init(void *out_context) "executions to S3; buffer=%s", ctx->fs->root_path); ctx->has_old_buffers = FLB_FALSE; - ret = put_all_chunks(ctx); + ret = put_all_chunks(ctx, FLB_TRUE); if (ret < 0) { ctx->has_old_buffers = FLB_TRUE; flb_plg_error(ctx->ins, "Failed to send locally buffered data left over " "from previous executions; will retry. Buffer=%s", ctx->fs->root_path); - FLB_OUTPUT_RETURN(FLB_RETRY); + } else { + flb_plg_info(ctx->ins, + "Successfully sent all locally buffered data left over " + "from previous executions. Buffer=%s", + ctx->fs->root_path); } } +} + + +/* + * Same as flb_output_return_do() but no coro prepare destroy + * and no coro yield. + * Using for S3 daemon thread so we can clean up the task + * But keep the coroutine. + * This is the best way to do it because task clean up is + * handled by the control thread in the engine AFAICT + * But writing to the output pipe is done from coro. + */ +static inline void flb_output_return_no_destroy(int ret) +{ + struct flb_coro *coro; + int n; + int pipe_fd; + uint32_t set; + uint64_t val; + struct flb_task *task; + struct flb_output_flush *out_flush; + struct flb_output_instance *o_ins; + struct flb_out_thread_instance *th_ins = NULL; + + coro = flb_coro_get(); + + out_flush = (struct flb_output_flush *) coro->data; + o_ins = out_flush->o_ins; + task = out_flush->task; + /* - * create a timer that will run periodically and check if uploads - * are ready for completion - * this is created once on the first flush + * To compose the signal event the relevant info is: + * + * - Unique Task events id: 2 in this case + * - Return value: FLB_OK (0), FLB_ERROR (1) or FLB_RETRY (2) + * - Task ID + * - Output Instance ID (struct flb_output_instance)->id + * + * We put together the return value with the task_id on the 32 bits at right */ - if (ctx->timer_created == FLB_FALSE) { - flb_plg_debug(ctx->ins, - "Creating upload timer with frequency %ds", - ctx->timer_ms / 1000); + set = FLB_TASK_SET(ret, task->id, o_ins->id); + val = FLB_BITS_U64_SET(2 /* FLB_ENGINE_TASK */, set); - sched = flb_sched_ctx_get(); + /* + * Set the target pipe channel: if this return code is running inside a + * thread pool worker, use the specific worker pipe/event loop to handle + * the return status, otherwise use the channel connected to the parent + * event loop. + */ + if (flb_output_is_threaded(o_ins) == FLB_TRUE) { + /* Retrieve the thread instance and prepare pipe channel */ + th_ins = flb_output_thread_instance_get(); + pipe_fd = th_ins->ch_thread_events[1]; + } + else { + pipe_fd = out_flush->o_ins->ch_events[1]; + } - if (ctx->preserve_data_ordering) { - ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, s3_upload_queue, ctx, NULL); - } - else { - ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, cb_s3_upload, ctx, NULL); - } - if (ret == -1) { - flb_plg_error(ctx->ins, "Failed to create upload timer"); - FLB_OUTPUT_RETURN(FLB_RETRY); + /* Notify the event loop about our return status */ + n = flb_pipe_w(pipe_fd, (void *) &val, sizeof(val)); + if (n == -1) { + flb_errno(); + } +} + +/* + * We need to use async IO for S3 because it is more stable + * However, S3 has unique needs. The chunk list, multipart code, etc + * all is not concurrent safe. + * Additionally, timer callbacks don't run in coroutines and + * can't using async IO. + * Solution: daemon coroutine + * The first coroutine that that flushes to S3 never ends, + * and just uploads and sleeps. + * + * We increment the metrics counters for the chunk originally + * associated with the coroutine and decrement the task users + * in release_chunk_upstream() + */ +static void daemon_coroutine(struct flb_config *config, struct flb_s3 *ctx) +{ + flb_plg_info(ctx->ins, "May 2nd: daemon coroutine starting..."); + + ctx->daemon_coro_started = FLB_TRUE; + + /* tell engine that this task did complete successfully */ + flb_output_return_no_destroy(FLB_OK); + + /* + * FLB engine uses a graceful cooperative shutdown model. + * If coroutines never end, the system won't stop. + * So the daemon coroutine must exit itself when the engine is in shutdown mode. + */ + while (config->is_running == FLB_TRUE) { + /* Cleanup old buffers found on startup */ + flush_startup_chunks(ctx); + + /* upload any ready chunks */ + cb_s3_upload(config, ctx); + + if (config->is_running == FLB_FALSE) { + break; } - ctx->timer_created = FLB_TRUE; + /* + * special coroutine sleep + * Doesn't block any thread + * Puts an event on the event + * loop which will wake this coro back up + */ + flb_time_sleep(ctx->timer_ms); + flb_plg_info(ctx->ins, "May 2nd: daemon coroutine resumed..."); } } @@ -2103,9 +2081,11 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, struct s3_file *upload_file = NULL; struct flb_s3 *ctx = out_context; struct multipart_upload *m_upload_file = NULL; - - /* Cleanup old buffers and initialize upload timer */ - flush_init(ctx); + msgpack_unpacked result; + msgpack_object *obj; + size_t off = 0; + struct flb_time tms; + time_t file_first_log_time = 0; /* Process chunk */ if (ctx->log_key) { @@ -2131,88 +2111,80 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, event_chunk->tag, flb_sds_len(event_chunk->tag)); + if (upload_file == NULL) { + /* unpack msgpack */ + msgpack_unpacked_init(&result); + + /* Get the first record timestamp */ + while (msgpack_unpack_next(&result, + event_chunk->data, + event_chunk->size, &off) == MSGPACK_UNPACK_SUCCESS) { + flb_time_pop_from_msgpack(&tms, &result, &obj); + if (&tms.tm.tv_sec != 0) { + file_first_log_time = tms.tm.tv_sec; + break; + } + } + + msgpack_unpacked_destroy(&result); + } + else { + /* Get file_first_log_time from upload_file */ + file_first_log_time = upload_file->first_log_time; + } + + if (file_first_log_time == 0) { + file_first_log_time = time(NULL); + } + /* Specific to unit tests, will not get called normally */ if (s3_plugin_under_test() == FLB_TRUE) { unit_test_flush(ctx, upload_file, event_chunk->tag, flb_sds_len(event_chunk->tag), - chunk, chunk_size, m_upload_file); + chunk, chunk_size, + m_upload_file, file_first_log_time, i_ins->name); } - /* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */ - if (upload_file != NULL && upload_file->failures >= MAX_UPLOAD_ERRORS) { - flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not " - "retry", event_chunk->tag, MAX_UPLOAD_ERRORS); - s3_store_file_inactive(ctx, upload_file); - upload_file = NULL; + /* + * Buffer new data in chunk in filesystem and wait for next data from engine + * If successful, data ordering is preserved to be the same as the engine sent us + */ + ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, + event_chunk->tag, flb_sds_len(event_chunk->tag), + file_first_log_time, i_ins->name); + + if (ret < 0) { + FLB_OUTPUT_RETURN(FLB_RETRY); } /* If upload_timeout has elapsed, upload file */ if (upload_file != NULL && time(NULL) > (upload_file->create_time + ctx->upload_timeout)) { upload_timeout_check = FLB_TRUE; - flb_plg_info(ctx->ins, "upload_timeout reached for %s", - event_chunk->tag); + flb_plg_info(ctx->ins, "upload_timeout reached for chunk from %s, tag=%s", + i_ins->name, event_chunk->tag); } m_upload_file = get_upload(ctx, event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (m_upload_file != NULL && time(NULL) > - (m_upload_file->init_time + ctx->upload_timeout)) { - upload_timeout_check = FLB_TRUE; - flb_plg_info(ctx->ins, "upload_timeout reached for %s", event_chunk->tag); - } - /* If total_file_size has been reached, upload file */ if ((upload_file && upload_file->size + chunk_size > ctx->upload_chunk_size) || (m_upload_file && m_upload_file->bytes + chunk_size > ctx->file_size)) { total_file_size_check = FLB_TRUE; } - /* File is ready for upload */ - if (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE) { - if (ctx->preserve_data_ordering == FLB_TRUE) { - /* Buffer last chunk in file and lock file to prevent further changes */ - ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, - event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - s3_store_file_lock(upload_file); - - /* Add chunk file to upload queue */ - ret = add_to_queue(ctx, upload_file, m_upload_file, - event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_ERROR); - } - - /* Go through upload queue and return error if something went wrong */ - s3_upload_queue(config, ctx); - if (ctx->upload_queue_success == FLB_FALSE) { - ctx->upload_queue_success = FLB_TRUE; - FLB_OUTPUT_RETURN(FLB_ERROR); - } - FLB_OUTPUT_RETURN(FLB_OK); - } - else { - /* Send upload directly without upload queue */ - ret = send_upload_request(ctx, chunk, upload_file, m_upload_file, - event_chunk->tag, - flb_sds_len(event_chunk->tag)); - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_ERROR); - } - FLB_OUTPUT_RETURN(ret); - } + /* lock chunk file so new new appends. Its ready to be sent. */ + if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) { + s3_store_file_lock(upload_file); + /* sends only happen from upload daemon coroutine which iterates over queue */ + mk_list_add(&upload_file->_head, &ctx->upload_queue); } - /* Buffer current chunk in filesystem and wait for next chunk from engine */ - ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, - event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_RETRY); + if (ctx->daemon_coro_started == FLB_FALSE) { + daemon_coroutine(config, ctx); } + FLB_OUTPUT_RETURN(FLB_OK); } @@ -2224,6 +2196,8 @@ static int cb_s3_exit(void *data, struct flb_config *config) struct mk_list *tmp; struct mk_list *head; + flb_plg_info(ctx->ins, "cb_exit"); + if (!ctx) { return 0; } @@ -2234,7 +2208,7 @@ static int cb_s3_exit(void *data, struct flb_config *config) ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); } flb_plg_info(ctx->ins, "Sending all locally buffered data to S3"); - ret = put_all_chunks(ctx); + ret = put_all_chunks(ctx, FLB_FALSE); if (ret < 0) { flb_plg_error(ctx->ins, "Could not send all chunks on exit"); } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 0f60aa90f0e..8d8beb8e63f 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -47,29 +47,6 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 -/* - * If we see repeated errors on an upload/chunk, we will discard it - * This saves us from scenarios where something goes wrong and an upload can - * not proceed (may be some other process completed it or deleted the upload) - * instead of erroring out forever, we eventually discard the upload. - * - * The same is done for chunks, just to be safe, even though realistically - * I can't think of a reason why a chunk could become unsendable. - */ -#define MAX_UPLOAD_ERRORS 5 - -struct upload_queue { - struct s3_file *upload_file; - struct multipart_upload *m_upload_file; - flb_sds_t tag; - int tag_len; - - int retry_counter; - time_t upload_time; - - struct mk_list _head; -}; - struct multipart_upload { flb_sds_t s3_key; flb_sds_t tag; @@ -93,6 +70,9 @@ struct multipart_upload { /* ongoing tracker of how much data has been sent for this upload */ size_t bytes; + /* for s3 retry warn message */ + char *input_name; + struct mk_list _head; /* see note for MAX_UPLOAD_ERRORS */ @@ -156,10 +136,11 @@ struct flb_s3 { struct mk_list uploads; - int preserve_data_ordering; - int upload_queue_success; + /* list of locked chunks that are ready to send */ struct mk_list upload_queue; + int preserve_data_ordering; + size_t file_size; size_t upload_chunk_size; time_t upload_timeout; @@ -174,6 +155,8 @@ struct flb_s3 { flb_sds_t metadata_dir; flb_sds_t seq_index_file; + int daemon_coro_started; + struct flb_output_instance *ins; }; diff --git a/plugins/out_s3/s3_store.c b/plugins/out_s3/s3_store.c index 050734a05f0..a7aac084900 100644 --- a/plugins/out_s3/s3_store.c +++ b/plugins/out_s3/s3_store.c @@ -125,7 +125,9 @@ struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag, /* Append data to a new or existing fstore file */ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, const char *tag, int tag_len, - char *data, size_t bytes) + char *data, size_t bytes, + time_t file_first_log_time, + char* input_name) { int ret; flb_sds_t name; @@ -175,7 +177,9 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, return -1; } s3_file->fsf = fsf; + s3_file->first_log_time = file_first_log_time; s3_file->create_time = time(NULL); + s3_file->input_name = input_name; /* Use fstore opaque 'data' reference to keep our context */ fsf->data = s3_file; @@ -241,6 +245,7 @@ static int set_files_context(struct flb_s3 *ctx) continue; } s3_file->fsf = fsf; + s3_file->first_log_time = time(NULL); s3_file->create_time = time(NULL); /* Use fstore opaque 'data' reference to keep our context */ diff --git a/plugins/out_s3/s3_store.h b/plugins/out_s3/s3_store.h index 242d99ab6fc..04539fb50e9 100644 --- a/plugins/out_s3/s3_store.h +++ b/plugins/out_s3/s3_store.h @@ -24,17 +24,22 @@ #include struct s3_file { - int locked; /* locked chunk is busy, cannot write to it */ + int locked; /* locked = no appends to this chunk */ int failures; /* delivery failures */ size_t size; /* file size */ time_t create_time; /* creation time */ + time_t first_log_time; /* first log time */ + char *input_name; /* for s3_retry_warn output message */ flb_sds_t file_path; /* file path */ struct flb_fstore_file *fsf; /* reference to parent flb_fstore_file */ + struct mk_list _head; }; int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, const char *tag, int tag_len, - char *data, size_t bytes); + char *data, size_t bytes, + time_t file_first_log_time, + char *input_name); int s3_store_init(struct flb_s3 *ctx); int s3_store_exit(struct flb_s3 *ctx);