From d4113add803351ea97dc558140b46b4907b47c03 Mon Sep 17 00:00:00 2001 From: Clay Cheng Date: Wed, 2 Nov 2022 20:49:12 +0000 Subject: [PATCH] out_kinesis_streams: added function get_time_format_string for time output can support millisecond and nanosecond by adding %F or %L to time_key_format. Signed-off-by: Clay Cheng chaych@amazon.com --- plugins/out_kinesis_streams/kinesis_api.c | 58 ++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/plugins/out_kinesis_streams/kinesis_api.c b/plugins/out_kinesis_streams/kinesis_api.c index b7e29030383..06e9420fd91 100644 --- a/plugins/out_kinesis_streams/kinesis_api.c +++ b/plugins/out_kinesis_streams/kinesis_api.c @@ -97,6 +97,61 @@ static int init_put_payload(struct flb_kinesis *ctx, struct flush *buf, return -1; } +/* + * Count the substring number in string. + */ +static int get_substring_number(char *str1, char *str2){ + int x = 0; + char *p; + + do{ + p = strstr(str1, str2); + if (p != NULL) { + str1 = p + 1; + x = x + 1; + } + } + while(p != NULL); + return x; +} + +/* + * Adjust the time_key output format according to time_key_format. + */ +static void get_time_format_string(char *str, size_t maxsize, + const char *time_key_format, const struct tm *timeptr, + size_t len, struct flb_time *tms){ + int millisecond_length = ((int)((ceil(log10(abs((uint64_t) tms->tm.tv_nsec / 1000000))) + 1)*sizeof(char))); + int nanosecond_length = ((int)((ceil(log10(abs((uint64_t) tms->tm.tv_nsec)))+1)*sizeof(char))); + char millisecond_string[millisecond_length]; + char nanosecond_string[nanosecond_length]; + char *milli_string_old = "%F"; + char *nano_string_old = "%L"; + int bstr_length = strlen(time_key_format)+get_substring_number(time_key_format, milli_string_old)+7*get_substring_number(time_key_format, nano_string_old); + char bstr[bstr_length]; + char mem_for_time_key_format[strlen(time_key_format)]; + + memset(mem_for_time_key_format,0,strlen(mem_for_time_key_format)); + strcpy(mem_for_time_key_format, time_key_format); + snprintf(millisecond_string, millisecond_length, "%" PRIu64, (uint64_t) tms->tm.tv_nsec / 1000000); + snprintf(nanosecond_string, nanosecond_length, "%" PRIu64, (uint64_t) tms->tm.tv_nsec); + memset(bstr,0,sizeof(bstr)); + for (int i = 0; i < strlen(time_key_format); i++) { + if (!strncmp(time_key_format + i, milli_string_old, strlen(milli_string_old))) { + strcat(bstr, millisecond_string); + i += strlen(milli_string_old) - 1; + } else if (!strncmp(time_key_format + i, nano_string_old, strlen(nano_string_old))) { + strcat(bstr, nanosecond_string); + i += strlen(nano_string_old) - 1; + } else { + strncat(bstr,time_key_format + i,1); + } + } + strcpy(time_key_format, bstr); + len = strftime(str, maxsize, time_key_format, timeptr); + strcpy(time_key_format, mem_for_time_key_format); +} + /* * Simple and fast hashing algorithm to create random partition keys */ @@ -295,7 +350,8 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf, time_key_ptr += 3; tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset; tmp_size -= (time_key_ptr - tmp_buf_ptr); - len = strftime(time_key_ptr, tmp_size, ctx->time_key_format, &time_stamp); + get_time_format_string(time_key_ptr, tmp_size, ctx->time_key_format, + &time_stamp, len, tms); if (len <= 0) { /* ran out of space - should not happen because of check above */ return 1;