From dfeff9de13ba7f1bbebe08fef24ec993dee7e392 Mon Sep 17 00:00:00 2001 From: Clay Cheng Date: Mon, 28 Nov 2022 23:25:54 +0000 Subject: [PATCH 1/3] flb_aws_util: added function flb_aws_strftime_precision for time output. Signed-off-by: Clay Cheng --- include/fluent-bit/flb_aws_util.h | 9 +++ src/aws/flb_aws_util.c | 97 +++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/include/fluent-bit/flb_aws_util.h b/include/fluent-bit/flb_aws_util.h index bafe429c5ad..ce38864e136 100644 --- a/include/fluent-bit/flb_aws_util.h +++ b/include/fluent-bit/flb_aws_util.h @@ -22,6 +22,7 @@ #ifndef FLB_AWS_UTIL_H #include +#include #define FLB_AWS_UTIL_H @@ -177,5 +178,13 @@ int flb_read_file(const char *path, char **out_buf, size_t *out_size); flb_sds_t flb_get_s3_key(const char *format, time_t time, const char *tag, char *tag_delimiter, uint64_t seq_index); +/* + * This function is an extension to strftime which can support milliseconds with %3N, + * support nanoseconds with %9N or %L. The return value is the length of formatted + * time string. + */ +size_t flb_aws_strftime_precision(char **out_buf, const char *time_format, + struct flb_time *tms); + #endif #endif /* FLB_HAVE_AWS */ diff --git a/src/aws/flb_aws_util.c b/src/aws/flb_aws_util.c index 3ca93bd9a8f..2bcc00f4e4f 100644 --- a/src/aws/flb_aws_util.c +++ b/src/aws/flb_aws_util.c @@ -57,6 +57,12 @@ #define FLB_AWS_BASE_USER_AGENT_LEN 21 #endif +#define FLB_AWS_MILLISECOND_FORMATTER_LENGTH 3 +#define FLB_AWS_NANOSECOND_FORMATTER_LENGTH 9 +#define FLB_AWS_MILLISECOND_FORMATTER "%3N" +#define FLB_AWS_NANOSECOND_FORMATTER_N "%9N" +#define FLB_AWS_NANOSECOND_FORMATTER_L "%L" + struct flb_http_client *request_do(struct flb_aws_client *aws_client, int method, const char *uri, const char *body, size_t body_len, @@ -942,3 +948,94 @@ flb_sds_t flb_get_s3_key(const char *format, time_t time, const char *tag, } return NULL; } + +/* + * This function is an extension to strftime which can support milliseconds with %3N, + * support nanoseconds with %9N or %L. The return value is the length of formatted + * time string. + */ +size_t flb_aws_strftime_precision(char **out_buf, const char *time_format, + struct flb_time *tms) +{ + char millisecond_str[FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1]; + char nanosecond_str[FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1]; + char *tmp_parsed_time_str; + char *buf; + size_t out_size; + size_t tmp_parsed_time_str_len; + size_t time_format_len; + struct tm timestamp; + struct tm *tmp; + int i; + + /* + * Guess the max length needed for tmp_parsed_time_str and tmp_out_buf. The + * upper bound is 12*strlen(time_format) because the worst scenario will be only + * %c in time_format, and %c will be transfer to 24 chars long by function strftime(). + */ + time_format_len = strlen(time_format); + tmp_parsed_time_str_len = 12*time_format_len; + + /* + * Use tmp_parsed_time_str to buffer when replace %3N with milliseconds, replace + * %9N and %L with nanoseconds in time_format. + */ + tmp_parsed_time_str = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char)); + if (!tmp_parsed_time_str) { + flb_errno(); + return 0; + } + + buf = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char)); + if (!buf) { + flb_errno(); + flb_free(tmp_parsed_time_str); + return 0; + } + + /* Replace %3N to millisecond, %9N and %L to nanosecond in time_format. */ + snprintf(millisecond_str, FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1, + "%" PRIu64, (uint64_t) tms->tm.tv_nsec / 1000000); + snprintf(nanosecond_str, FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1, + "%" PRIu64, (uint64_t) tms->tm.tv_nsec); + for (i = 0; i < time_format_len; i++) { + if (strncmp(time_format+i, FLB_AWS_MILLISECOND_FORMATTER, 3) == 0) { + strncat(tmp_parsed_time_str, millisecond_str, + FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1); + i += 2; + } + else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_N, 3) == 0) { + strncat(tmp_parsed_time_str, nanosecond_str, + FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1); + i += 2; + } + else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_L, 2) == 0) { + strncat(tmp_parsed_time_str, nanosecond_str, + FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1); + i += 1; + } + else { + strncat(tmp_parsed_time_str,time_format+i,1); + } + } + + tmp = gmtime_r(&tms->tm.tv_sec, ×tamp); + if (!tmp) { + return 0; + } + + out_size = strftime(buf, tmp_parsed_time_str_len, + tmp_parsed_time_str, ×tamp); + + /* Check whether tmp_parsed_time_str_len is enough for tmp_out_buff */ + if (out_size == 0) { + flb_free(tmp_parsed_time_str); + flb_free(buf); + return 0; + } + + *out_buf = buf; + flb_free(tmp_parsed_time_str); + + return out_size; +} From 66e85a11590f0045294a3d1b104baf73569a1dd0 Mon Sep 17 00:00:00 2001 From: Clay Cheng Date: Wed, 28 Dec 2022 01:01:54 +0000 Subject: [PATCH 2/3] out_kinesis_streams: used function flb_aws_strftime_precision for time output. Signed-off-by: Clay Cheng --- plugins/out_kinesis_streams/kinesis_api.c | 59 ++++++++++++++--------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/plugins/out_kinesis_streams/kinesis_api.c b/plugins/out_kinesis_streams/kinesis_api.c index b7e29030383..73434187a4d 100644 --- a/plugins/out_kinesis_streams/kinesis_api.c +++ b/plugins/out_kinesis_streams/kinesis_api.c @@ -225,6 +225,7 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf, struct tm *tmp; size_t len; size_t tmp_size; + char *out_buf; tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; ret = flb_msgpack_to_json(tmp_buf_ptr, @@ -276,34 +277,48 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf, ctx->stream_name); return 2; } - /* guess space needed to write time_key */ - len = 6 + strlen(ctx->time_key) + 6 * strlen(ctx->time_key_format); + + /* format time output and return the length */ + len = flb_aws_strftime_precision(&out_buf, ctx->time_key_format, tms); + /* how much space do we have left */ tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written; if (len > tmp_size) { - /* not enough space- tell caller to retry */ + /* not enough space - tell caller to retry */ + flb_free(out_buf); return 1; } - time_key_ptr = tmp_buf_ptr + written - 1; - memcpy(time_key_ptr, ",", 1); - time_key_ptr++; - memcpy(time_key_ptr, "\"", 1); - time_key_ptr++; - memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key)); - time_key_ptr += strlen(ctx->time_key); - memcpy(time_key_ptr, "\":\"", 3); - time_key_ptr += 3; - tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset; - tmp_size -= (time_key_ptr - tmp_buf_ptr); - len = strftime(time_key_ptr, tmp_size, ctx->time_key_format, &time_stamp); - if (len <= 0) { - /* ran out of space - should not happen because of check above */ - return 1; + + if (len == 0) { + /* + * when the length of out_buf is not enough for time_key_format, + * time_key will not be added to record. + */ + flb_plg_error(ctx->ins, "Failed to add time_key %s to record, %s", + ctx->time_key, ctx->stream_name); + flb_free(out_buf); + } + else { + time_key_ptr = tmp_buf_ptr + written - 1; + memcpy(time_key_ptr, ",", 1); + time_key_ptr++; + memcpy(time_key_ptr, "\"", 1); + time_key_ptr++; + memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key)); + time_key_ptr += strlen(ctx->time_key); + memcpy(time_key_ptr, "\":\"", 3); + time_key_ptr += 3; + tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset; + tmp_size -= (time_key_ptr - tmp_buf_ptr); + + /* merge out_buf to time_key_ptr */ + memcpy(time_key_ptr, out_buf, len); + flb_free(out_buf); + time_key_ptr += len; + memcpy(time_key_ptr, "\"}", 2); + time_key_ptr += 2; + written = (time_key_ptr - tmp_buf_ptr); } - time_key_ptr += len; - memcpy(time_key_ptr, "\"}", 2); - time_key_ptr += 2; - written = (time_key_ptr - tmp_buf_ptr); } /* is (written + 1) because we still have to append newline */ From e225ff7374fbc5e290b0090652158e3d2db5d8a6 Mon Sep 17 00:00:00 2001 From: Clay Cheng Date: Wed, 28 Dec 2022 01:03:22 +0000 Subject: [PATCH 3/3] out_kinesis_firehose: used function flb_aws_strftime_precision for time output. Signed-off-by: Clay Cheng --- plugins/out_kinesis_firehose/firehose_api.c | 59 +++++++++++++-------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/plugins/out_kinesis_firehose/firehose_api.c b/plugins/out_kinesis_firehose/firehose_api.c index b9529408d6b..0516578c4f3 100644 --- a/plugins/out_kinesis_firehose/firehose_api.c +++ b/plugins/out_kinesis_firehose/firehose_api.c @@ -165,6 +165,7 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf, size_t len; size_t tmp_size; void *compressed_tmp_buf; + char *out_buf; tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; ret = flb_msgpack_to_json(tmp_buf_ptr, @@ -216,34 +217,48 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf, ctx->delivery_stream); return 2; } - /* guess space needed to write time_key */ - len = 6 + strlen(ctx->time_key) + 6 * strlen(ctx->time_key_format); + + /* format time output and return the length */ + len = flb_aws_strftime_precision(&out_buf, ctx->time_key_format, tms); + /* how much space do we have left */ tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written; if (len > tmp_size) { - /* not enough space- tell caller to retry */ + /* not enough space - tell caller to retry */ + flb_free(out_buf); return 1; } - time_key_ptr = tmp_buf_ptr + written - 1; - memcpy(time_key_ptr, ",", 1); - time_key_ptr++; - memcpy(time_key_ptr, "\"", 1); - time_key_ptr++; - memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key)); - time_key_ptr += strlen(ctx->time_key); - memcpy(time_key_ptr, "\":\"", 3); - time_key_ptr += 3; - tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset; - tmp_size -= (time_key_ptr - tmp_buf_ptr); - len = strftime(time_key_ptr, tmp_size, ctx->time_key_format, &time_stamp); - if (len <= 0) { - /* ran out of space - should not happen because of check above */ - return 1; + + if (len == 0) { + /* + * when the length of out_buf is not enough for time_key_format, + * time_key will not be added to record. + */ + flb_plg_error(ctx->ins, "Failed to add time_key %s to record, %s", + ctx->time_key, ctx->delivery_stream); + flb_free(out_buf); + } + else { + time_key_ptr = tmp_buf_ptr + written - 1; + memcpy(time_key_ptr, ",", 1); + time_key_ptr++; + memcpy(time_key_ptr, "\"", 1); + time_key_ptr++; + memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key)); + time_key_ptr += strlen(ctx->time_key); + memcpy(time_key_ptr, "\":\"", 3); + time_key_ptr += 3; + tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset; + tmp_size -= (time_key_ptr - tmp_buf_ptr); + + /* merge out_buf to time_key_ptr */ + memcpy(time_key_ptr, out_buf, len); + flb_free(out_buf); + time_key_ptr += len; + memcpy(time_key_ptr, "\"}", 2); + time_key_ptr += 2; + written = (time_key_ptr - tmp_buf_ptr); } - time_key_ptr += len; - memcpy(time_key_ptr, "\"}", 2); - time_key_ptr += 2; - written = (time_key_ptr - tmp_buf_ptr); } /* is (written + 1) because we still have to append newline */