Skip to content

Commit

Permalink
out_kinesis_streams: added function get_time_format_string for time o…
Browse files Browse the repository at this point in the history
…utput can support millisecond and nanosecond by adding %F or %L to time_key_format.

Signed-off-by: Clay Cheng [email protected]
  • Loading branch information
Claych committed Nov 3, 2022
1 parent f597b2f commit d4113ad
Showing 1 changed file with 57 additions and 1 deletion.
58 changes: 57 additions & 1 deletion plugins/out_kinesis_streams/kinesis_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,61 @@ static int init_put_payload(struct flb_kinesis *ctx, struct flush *buf,
return -1;
}

/*
* Count the substring number in string.
*/
static int get_substring_number(char *str1, char *str2){
int x = 0;
char *p;

do{
p = strstr(str1, str2);
if (p != NULL) {
str1 = p + 1;
x = x + 1;
}
}
while(p != NULL);
return x;
}

/*
* Adjust the time_key output format according to time_key_format.
*/
static void get_time_format_string(char *str, size_t maxsize,
const char *time_key_format, const struct tm *timeptr,
size_t len, struct flb_time *tms){
int millisecond_length = ((int)((ceil(log10(abs((uint64_t) tms->tm.tv_nsec / 1000000))) + 1)*sizeof(char)));
int nanosecond_length = ((int)((ceil(log10(abs((uint64_t) tms->tm.tv_nsec)))+1)*sizeof(char)));
char millisecond_string[millisecond_length];
char nanosecond_string[nanosecond_length];
char *milli_string_old = "%F";
char *nano_string_old = "%L";
int bstr_length = strlen(time_key_format)+get_substring_number(time_key_format, milli_string_old)+7*get_substring_number(time_key_format, nano_string_old);
char bstr[bstr_length];
char mem_for_time_key_format[strlen(time_key_format)];

memset(mem_for_time_key_format,0,strlen(mem_for_time_key_format));
strcpy(mem_for_time_key_format, time_key_format);
snprintf(millisecond_string, millisecond_length, "%" PRIu64, (uint64_t) tms->tm.tv_nsec / 1000000);
snprintf(nanosecond_string, nanosecond_length, "%" PRIu64, (uint64_t) tms->tm.tv_nsec);
memset(bstr,0,sizeof(bstr));
for (int i = 0; i < strlen(time_key_format); i++) {
if (!strncmp(time_key_format + i, milli_string_old, strlen(milli_string_old))) {
strcat(bstr, millisecond_string);
i += strlen(milli_string_old) - 1;
} else if (!strncmp(time_key_format + i, nano_string_old, strlen(nano_string_old))) {
strcat(bstr, nanosecond_string);
i += strlen(nano_string_old) - 1;
} else {
strncat(bstr,time_key_format + i,1);
}
}
strcpy(time_key_format, bstr);
len = strftime(str, maxsize, time_key_format, timeptr);
strcpy(time_key_format, mem_for_time_key_format);
}

/*
* Simple and fast hashing algorithm to create random partition keys
*/
Expand Down Expand Up @@ -295,7 +350,8 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf,
time_key_ptr += 3;
tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset;
tmp_size -= (time_key_ptr - tmp_buf_ptr);
len = strftime(time_key_ptr, tmp_size, ctx->time_key_format, &time_stamp);
get_time_format_string(time_key_ptr, tmp_size, ctx->time_key_format,
&time_stamp, len, tms);
if (len <= 0) {
/* ran out of space - should not happen because of check above */
return 1;
Expand Down

0 comments on commit d4113ad

Please sign in to comment.