diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 5f4c1027852..fe1926865b5 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -45,7 +45,7 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, struct s3_file *chunk, char **out_buf, size_t *out_size); -static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time, +static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, char *body, size_t body_size); static int put_all_chunks(struct flb_s3 *ctx); @@ -56,7 +56,8 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, const char *tag, int tag_len); static struct multipart_upload *create_upload(struct flb_s3 *ctx, - const char *tag, int tag_len); + const char *tag, int tag_len, + time_t file_first_log_time); static void remove_from_queue(struct upload_queue *entry); @@ -981,11 +982,19 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, int size_check = FLB_FALSE; int part_num_check = FLB_FALSE; int timeout_check = FLB_FALSE; - time_t create_time; int ret; void *payload_buf = NULL; size_t payload_size = 0; size_t preCompress_size = 0; + time_t file_first_log_time = 0; + + /* + * When chunk does not exist, file_first_log_time will be 0. + * This is only for unit tests. + */ + if (chunk != NULL) { + file_first_log_time = chunk->first_log_time; + } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { /* Map payload */ @@ -1052,14 +1061,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, * remove chunk from buffer list- needed for async http so that the * same chunk won't be sent more than once */ - if (chunk) { - create_time = chunk->create_time; - } - else { - create_time = time(NULL); - } - - ret = s3_put_object(ctx, tag, create_time, body, body_size); + ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size); if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1081,7 +1083,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, multipart: if (init_upload == FLB_TRUE) { - m_upload = create_upload(ctx, tag, tag_len); + m_upload = create_upload(ctx, tag, tag_len, file_first_log_time); if (!m_upload) { flb_plg_error(ctx->ins, "Could not find or create upload for tag %s", tag); if (chunk) { @@ -1320,7 +1322,7 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, return 0; } -static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time, +static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, char *body, size_t body_size) { flb_sds_t s3_key = NULL; @@ -1337,8 +1339,8 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_sds_t tmp; char final_body_md5[25]; - s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters, - ctx->seq_index); + s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag, + ctx->tag_delimiters, ctx->seq_index); if (!s3_key) { flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag); return -1; @@ -1509,8 +1511,8 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, return m_upload; } -static struct multipart_upload *create_upload(struct flb_s3 *ctx, - const char *tag, int tag_len) +static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag, + int tag_len, time_t file_first_log_time) { int ret; struct multipart_upload *m_upload = NULL; @@ -1523,8 +1525,8 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, flb_errno(); return NULL; } - s3_key = flb_get_s3_key(ctx->s3_key_format, time(NULL), tag, ctx->tag_delimiters, - ctx->seq_index); + s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag, + ctx->tag_delimiters, ctx->seq_index); if (!s3_key) { flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag); flb_free(m_upload); @@ -1664,13 +1666,16 @@ static int send_upload_request(void *out_context, flb_sds_t chunk, return ret; } -static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_t chunk, - int chunk_size, const char *tag, int tag_len) +static int buffer_chunk(void *out_context, struct s3_file *upload_file, + flb_sds_t chunk, int chunk_size, + const char *tag, int tag_len, + time_t file_first_log_time) { int ret; struct flb_s3 *ctx = out_context; - ret = s3_store_buffer_put(ctx, upload_file, tag, tag_len, chunk, (size_t) chunk_size); + ret = s3_store_buffer_put(ctx, upload_file, tag, + tag_len, chunk, (size_t) chunk_size, file_first_log_time); flb_sds_destroy(chunk); if (ret < 0) { flb_plg_warn(ctx->ins, "Could not buffer chunk. Data order preservation " @@ -2017,14 +2022,16 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char static void unit_test_flush(void *out_context, struct s3_file *upload_file, const char *tag, int tag_len, flb_sds_t chunk, - int chunk_size, struct multipart_upload *m_upload_file) + int chunk_size, struct multipart_upload *m_upload_file, + time_t file_first_log_time) { int ret; char *buffer; size_t buffer_size; struct flb_s3 *ctx = out_context; - s3_store_buffer_put(ctx, upload_file, tag, tag_len, chunk, (size_t) chunk_size); + s3_store_buffer_put(ctx, upload_file, tag, tag_len, + chunk, (size_t) chunk_size, file_first_log_time); ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size); if (ret < 0) { flb_plg_error(ctx->ins, "Could not construct request buffer for %s", @@ -2104,6 +2111,11 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, struct s3_file *upload_file = NULL; struct flb_s3 *ctx = out_context; struct multipart_upload *m_upload_file = NULL; + msgpack_unpacked result; + msgpack_object *obj; + size_t off = 0; + struct flb_time tms; + time_t file_first_log_time = 0; /* Cleanup old buffers and initialize upload timer */ flush_init(ctx); @@ -2132,11 +2144,38 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, event_chunk->tag, flb_sds_len(event_chunk->tag)); + if (upload_file == NULL) { + /* unpack msgpack */ + msgpack_unpacked_init(&result); + + /* Get the first record timestamp */ + while (msgpack_unpack_next(&result, + event_chunk->data, + event_chunk->size, &off) == MSGPACK_UNPACK_SUCCESS) { + flb_time_pop_from_msgpack(&tms, &result, &obj); + if (&tms.tm.tv_sec != 0) { + file_first_log_time = tms.tm.tv_sec; + break; + } + } + + msgpack_unpacked_destroy(&result); + } + else { + /* Get file_first_log_time from upload_file */ + file_first_log_time = upload_file->first_log_time; + } + + if (file_first_log_time == 0) { + file_first_log_time = time(NULL); + } + /* Specific to unit tests, will not get called normally */ if (s3_plugin_under_test() == FLB_TRUE) { unit_test_flush(ctx, upload_file, event_chunk->tag, flb_sds_len(event_chunk->tag), - chunk, chunk_size, m_upload_file); + chunk, chunk_size, + m_upload_file, file_first_log_time); } /* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */ @@ -2171,11 +2210,13 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, } /* File is ready for upload */ - if (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE) { + if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) { if (ctx->preserve_data_ordering == FLB_TRUE) { /* Buffer last chunk in file and lock file to prevent further changes */ ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, - event_chunk->tag, flb_sds_len(event_chunk->tag)); + event_chunk->tag, flb_sds_len(event_chunk->tag), + file_first_log_time); + if (ret < 0) { FLB_OUTPUT_RETURN(FLB_RETRY); } @@ -2210,7 +2251,9 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, /* Buffer current chunk in filesystem and wait for next chunk from engine */ ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, - event_chunk->tag, flb_sds_len(event_chunk->tag)); + event_chunk->tag, flb_sds_len(event_chunk->tag), + file_first_log_time); + if (ret < 0) { FLB_OUTPUT_RETURN(FLB_RETRY); } diff --git a/plugins/out_s3/s3_store.c b/plugins/out_s3/s3_store.c index 050734a05f0..8a96406335a 100644 --- a/plugins/out_s3/s3_store.c +++ b/plugins/out_s3/s3_store.c @@ -125,7 +125,8 @@ struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag, /* Append data to a new or existing fstore file */ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, const char *tag, int tag_len, - char *data, size_t bytes) + char *data, size_t bytes, + time_t file_first_log_time) { int ret; flb_sds_t name; @@ -175,6 +176,7 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, return -1; } s3_file->fsf = fsf; + s3_file->first_log_time = file_first_log_time; s3_file->create_time = time(NULL); /* Use fstore opaque 'data' reference to keep our context */ @@ -241,6 +243,7 @@ static int set_files_context(struct flb_s3 *ctx) continue; } s3_file->fsf = fsf; + s3_file->first_log_time = time(NULL); s3_file->create_time = time(NULL); /* Use fstore opaque 'data' reference to keep our context */ diff --git a/plugins/out_s3/s3_store.h b/plugins/out_s3/s3_store.h index 242d99ab6fc..9caa7bdf482 100644 --- a/plugins/out_s3/s3_store.h +++ b/plugins/out_s3/s3_store.h @@ -28,13 +28,15 @@ struct s3_file { int failures; /* delivery failures */ size_t size; /* file size */ time_t create_time; /* creation time */ + time_t first_log_time; /* first log time */ flb_sds_t file_path; /* file path */ struct flb_fstore_file *fsf; /* reference to parent flb_fstore_file */ }; int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, const char *tag, int tag_len, - char *data, size_t bytes); + char *data, size_t bytes, + time_t file_first_log_time); int s3_store_init(struct flb_s3 *ctx); int s3_store_exit(struct flb_s3 *ctx);