Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
out_s3: update for getting upload time from timestamp.
Browse files Browse the repository at this point in the history
Signed-off-by: Clay Cheng <claychen@amazon.com>
Clay Cheng authored and Claych committed Jan 9, 2023
1 parent 760956f commit a8a34c1
Showing 3 changed files with 78 additions and 30 deletions.
99 changes: 71 additions & 28 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
struct s3_file *chunk,
char **out_buf, size_t *out_size);

static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time,
static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time,
char *body, size_t body_size);

static int put_all_chunks(struct flb_s3 *ctx);
@@ -56,7 +56,8 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,
const char *tag, int tag_len);

static struct multipart_upload *create_upload(struct flb_s3 *ctx,
const char *tag, int tag_len);
const char *tag, int tag_len,
time_t file_first_log_time);

static void remove_from_queue(struct upload_queue *entry);

@@ -981,11 +982,19 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
int size_check = FLB_FALSE;
int part_num_check = FLB_FALSE;
int timeout_check = FLB_FALSE;
time_t create_time;
int ret;
void *payload_buf = NULL;
size_t payload_size = 0;
size_t preCompress_size = 0;
time_t file_first_log_time = 0;

/*
* When chunk does not exist, file_first_log_time will be 0.
* This is only for unit tests.
*/
if (chunk != NULL) {
file_first_log_time = chunk->first_log_time;
}

if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
/* Map payload */
@@ -1052,14 +1061,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
* remove chunk from buffer list- needed for async http so that the
* same chunk won't be sent more than once
*/
if (chunk) {
create_time = chunk->create_time;
}
else {
create_time = time(NULL);
}

ret = s3_put_object(ctx, tag, create_time, body, body_size);
ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size);
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
flb_free(payload_buf);
}
@@ -1081,7 +1083,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
multipart:

if (init_upload == FLB_TRUE) {
m_upload = create_upload(ctx, tag, tag_len);
m_upload = create_upload(ctx, tag, tag_len, file_first_log_time);
if (!m_upload) {
flb_plg_error(ctx->ins, "Could not find or create upload for tag %s", tag);
if (chunk) {
@@ -1320,7 +1322,7 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
return 0;
}

static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time,
static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time,
char *body, size_t body_size)
{
flb_sds_t s3_key = NULL;
@@ -1337,8 +1339,8 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_sds_t tmp;
char final_body_md5[25];

s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters,
ctx->seq_index);
s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag,
ctx->tag_delimiters, ctx->seq_index);
if (!s3_key) {
flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag);
return -1;
@@ -1509,8 +1511,8 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,
return m_upload;
}

static struct multipart_upload *create_upload(struct flb_s3 *ctx,
const char *tag, int tag_len)
static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag,
int tag_len, time_t file_first_log_time)
{
int ret;
struct multipart_upload *m_upload = NULL;
@@ -1523,8 +1525,8 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx,
flb_errno();
return NULL;
}
s3_key = flb_get_s3_key(ctx->s3_key_format, time(NULL), tag, ctx->tag_delimiters,
ctx->seq_index);
s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag,
ctx->tag_delimiters, ctx->seq_index);
if (!s3_key) {
flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag);
flb_free(m_upload);
@@ -1664,13 +1666,16 @@ static int send_upload_request(void *out_context, flb_sds_t chunk,
return ret;
}

static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_t chunk,
int chunk_size, const char *tag, int tag_len)
static int buffer_chunk(void *out_context, struct s3_file *upload_file,
flb_sds_t chunk, int chunk_size,
const char *tag, int tag_len,
time_t file_first_log_time)
{
int ret;
struct flb_s3 *ctx = out_context;

ret = s3_store_buffer_put(ctx, upload_file, tag, tag_len, chunk, (size_t) chunk_size);
ret = s3_store_buffer_put(ctx, upload_file, tag,
tag_len, chunk, (size_t) chunk_size, file_first_log_time);
flb_sds_destroy(chunk);
if (ret < 0) {
flb_plg_warn(ctx->ins, "Could not buffer chunk. Data order preservation "
@@ -2017,14 +2022,16 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char

static void unit_test_flush(void *out_context, struct s3_file *upload_file,
const char *tag, int tag_len, flb_sds_t chunk,
int chunk_size, struct multipart_upload *m_upload_file)
int chunk_size, struct multipart_upload *m_upload_file,
time_t file_first_log_time)
{
int ret;
char *buffer;
size_t buffer_size;
struct flb_s3 *ctx = out_context;

s3_store_buffer_put(ctx, upload_file, tag, tag_len, chunk, (size_t) chunk_size);
s3_store_buffer_put(ctx, upload_file, tag, tag_len,
chunk, (size_t) chunk_size, file_first_log_time);
ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size);
if (ret < 0) {
flb_plg_error(ctx->ins, "Could not construct request buffer for %s",
@@ -2104,6 +2111,11 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
struct s3_file *upload_file = NULL;
struct flb_s3 *ctx = out_context;
struct multipart_upload *m_upload_file = NULL;
msgpack_unpacked result;
msgpack_object *obj;
size_t off = 0;
struct flb_time tms;
time_t file_first_log_time = 0;

/* Cleanup old buffers and initialize upload timer */
flush_init(ctx);
@@ -2132,11 +2144,38 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
event_chunk->tag,
flb_sds_len(event_chunk->tag));

if (upload_file == NULL) {
/* unpack msgpack */
msgpack_unpacked_init(&result);

/* Get the first record timestamp */
while (msgpack_unpack_next(&result,
event_chunk->data,
event_chunk->size, &off) == MSGPACK_UNPACK_SUCCESS) {
flb_time_pop_from_msgpack(&tms, &result, &obj);
if (&tms.tm.tv_sec != 0) {
file_first_log_time = tms.tm.tv_sec;
break;
}
}

msgpack_unpacked_destroy(&result);
}
else {
/* Get file_first_log_time from upload_file */
file_first_log_time = upload_file->first_log_time;
}

if (file_first_log_time == 0) {
file_first_log_time = time(NULL);
}

/* Specific to unit tests, will not get called normally */
if (s3_plugin_under_test() == FLB_TRUE) {
unit_test_flush(ctx, upload_file,
event_chunk->tag, flb_sds_len(event_chunk->tag),
chunk, chunk_size, m_upload_file);
chunk, chunk_size,
m_upload_file, file_first_log_time);
}

/* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */
@@ -2171,11 +2210,13 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
}

/* File is ready for upload */
if (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE) {
if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) {
if (ctx->preserve_data_ordering == FLB_TRUE) {
/* Buffer last chunk in file and lock file to prevent further changes */
ret = buffer_chunk(ctx, upload_file, chunk, chunk_size,
event_chunk->tag, flb_sds_len(event_chunk->tag));
event_chunk->tag, flb_sds_len(event_chunk->tag),
file_first_log_time);

if (ret < 0) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
@@ -2210,7 +2251,9 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,

/* Buffer current chunk in filesystem and wait for next chunk from engine */
ret = buffer_chunk(ctx, upload_file, chunk, chunk_size,
event_chunk->tag, flb_sds_len(event_chunk->tag));
event_chunk->tag, flb_sds_len(event_chunk->tag),
file_first_log_time);

if (ret < 0) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
5 changes: 4 additions & 1 deletion plugins/out_s3/s3_store.c
Original file line number Diff line number Diff line change
@@ -125,7 +125,8 @@ struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag,
/* Append data to a new or existing fstore file */
int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
const char *tag, int tag_len,
char *data, size_t bytes)
char *data, size_t bytes,
time_t file_first_log_time)
{
int ret;
flb_sds_t name;
@@ -175,6 +176,7 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
return -1;
}
s3_file->fsf = fsf;
s3_file->first_log_time = file_first_log_time;
s3_file->create_time = time(NULL);

/* Use fstore opaque 'data' reference to keep our context */
@@ -241,6 +243,7 @@ static int set_files_context(struct flb_s3 *ctx)
continue;
}
s3_file->fsf = fsf;
s3_file->first_log_time = time(NULL);
s3_file->create_time = time(NULL);

/* Use fstore opaque 'data' reference to keep our context */
4 changes: 3 additions & 1 deletion plugins/out_s3/s3_store.h
Original file line number Diff line number Diff line change
@@ -28,13 +28,15 @@ struct s3_file {
int failures; /* delivery failures */
size_t size; /* file size */
time_t create_time; /* creation time */
time_t first_log_time; /* first log time */
flb_sds_t file_path; /* file path */
struct flb_fstore_file *fsf; /* reference to parent flb_fstore_file */
};

int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
const char *tag, int tag_len,
char *data, size_t bytes);
char *data, size_t bytes,
time_t file_first_log_time);

int s3_store_init(struct flb_s3 *ctx);
int s3_store_exit(struct flb_s3 *ctx);

0 comments on commit a8a34c1

Please sign in to comment.