Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flb_aws_util: added function flb_aws_strftime_precision for time output #6472

Merged
merged 3 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/fluent-bit/flb_aws_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#ifndef FLB_AWS_UTIL_H

#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_time.h>

#define FLB_AWS_UTIL_H

Expand Down Expand Up @@ -177,5 +178,13 @@ int flb_read_file(const char *path, char **out_buf, size_t *out_size);
flb_sds_t flb_get_s3_key(const char *format, time_t time, const char *tag,
char *tag_delimiter, uint64_t seq_index);

/*
* This function is an extension to strftime which can support milliseconds with %3N,
* support nanoseconds with %9N or %L. The return value is the length of formatted
* time string.
*/
size_t flb_aws_strftime_precision(char **out_buf, const char *time_format,
struct flb_time *tms);

#endif
#endif /* FLB_HAVE_AWS */
59 changes: 37 additions & 22 deletions plugins/out_kinesis_firehose/firehose_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf,
size_t len;
size_t tmp_size;
void *compressed_tmp_buf;
char *out_buf;

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
ret = flb_msgpack_to_json(tmp_buf_ptr,
Expand Down Expand Up @@ -216,34 +217,48 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf,
ctx->delivery_stream);
return 2;
}
/* guess space needed to write time_key */
len = 6 + strlen(ctx->time_key) + 6 * strlen(ctx->time_key_format);

/* format time output and return the length */
len = flb_aws_strftime_precision(&out_buf, ctx->time_key_format, tms);

/* how much space do we have left */
tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written;
if (len > tmp_size) {
/* not enough space- tell caller to retry */
/* not enough space - tell caller to retry */
flb_free(out_buf);
return 1;
}
time_key_ptr = tmp_buf_ptr + written - 1;
memcpy(time_key_ptr, ",", 1);
time_key_ptr++;
memcpy(time_key_ptr, "\"", 1);
time_key_ptr++;
memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key));
time_key_ptr += strlen(ctx->time_key);
memcpy(time_key_ptr, "\":\"", 3);
time_key_ptr += 3;
tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset;
tmp_size -= (time_key_ptr - tmp_buf_ptr);
len = strftime(time_key_ptr, tmp_size, ctx->time_key_format, &time_stamp);
if (len <= 0) {
/* ran out of space - should not happen because of check above */
return 1;

if (len == 0) {
/*
* when the length of out_buf is not enough for time_key_format,
* time_key will not be added to record.
*/
flb_plg_error(ctx->ins, "Failed to add time_key %s to record, %s",
ctx->time_key, ctx->delivery_stream);
flb_free(out_buf);
}
else {
time_key_ptr = tmp_buf_ptr + written - 1;
memcpy(time_key_ptr, ",", 1);
time_key_ptr++;
memcpy(time_key_ptr, "\"", 1);
time_key_ptr++;
memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key));
time_key_ptr += strlen(ctx->time_key);
memcpy(time_key_ptr, "\":\"", 3);
time_key_ptr += 3;
tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset;
tmp_size -= (time_key_ptr - tmp_buf_ptr);

/* merge out_buf to time_key_ptr */
memcpy(time_key_ptr, out_buf, len);
flb_free(out_buf);
time_key_ptr += len;
memcpy(time_key_ptr, "\"}", 2);
time_key_ptr += 2;
written = (time_key_ptr - tmp_buf_ptr);
}
time_key_ptr += len;
memcpy(time_key_ptr, "\"}", 2);
time_key_ptr += 2;
written = (time_key_ptr - tmp_buf_ptr);
}

/* is (written + 1) because we still have to append newline */
Expand Down
59 changes: 37 additions & 22 deletions plugins/out_kinesis_streams/kinesis_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf,
struct tm *tmp;
size_t len;
size_t tmp_size;
char *out_buf;

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
ret = flb_msgpack_to_json(tmp_buf_ptr,
Expand Down Expand Up @@ -276,34 +277,48 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf,
ctx->stream_name);
return 2;
}
/* guess space needed to write time_key */
len = 6 + strlen(ctx->time_key) + 6 * strlen(ctx->time_key_format);

/* format time output and return the length */
len = flb_aws_strftime_precision(&out_buf, ctx->time_key_format, tms);

/* how much space do we have left */
tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written;
if (len > tmp_size) {
/* not enough space- tell caller to retry */
/* not enough space - tell caller to retry */
flb_free(out_buf);
return 1;
}
time_key_ptr = tmp_buf_ptr + written - 1;
memcpy(time_key_ptr, ",", 1);
time_key_ptr++;
memcpy(time_key_ptr, "\"", 1);
time_key_ptr++;
memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key));
time_key_ptr += strlen(ctx->time_key);
memcpy(time_key_ptr, "\":\"", 3);
time_key_ptr += 3;
tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset;
tmp_size -= (time_key_ptr - tmp_buf_ptr);
len = strftime(time_key_ptr, tmp_size, ctx->time_key_format, &time_stamp);
if (len <= 0) {
/* ran out of space - should not happen because of check above */
return 1;

if (len == 0) {
/*
* when the length of out_buf is not enough for time_key_format,
* time_key will not be added to record.
*/
flb_plg_error(ctx->ins, "Failed to add time_key %s to record, %s",
ctx->time_key, ctx->stream_name);
flb_free(out_buf);
}
else {
time_key_ptr = tmp_buf_ptr + written - 1;
memcpy(time_key_ptr, ",", 1);
time_key_ptr++;
memcpy(time_key_ptr, "\"", 1);
time_key_ptr++;
memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key));
time_key_ptr += strlen(ctx->time_key);
memcpy(time_key_ptr, "\":\"", 3);
time_key_ptr += 3;
tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset;
tmp_size -= (time_key_ptr - tmp_buf_ptr);

/* merge out_buf to time_key_ptr */
memcpy(time_key_ptr, out_buf, len);
flb_free(out_buf);
time_key_ptr += len;
memcpy(time_key_ptr, "\"}", 2);
time_key_ptr += 2;
written = (time_key_ptr - tmp_buf_ptr);
}
time_key_ptr += len;
memcpy(time_key_ptr, "\"}", 2);
time_key_ptr += 2;
written = (time_key_ptr - tmp_buf_ptr);
}

/* is (written + 1) because we still have to append newline */
Expand Down
97 changes: 97 additions & 0 deletions src/aws/flb_aws_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@
#define FLB_AWS_BASE_USER_AGENT_LEN 21
#endif

#define FLB_AWS_MILLISECOND_FORMATTER_LENGTH 3
#define FLB_AWS_NANOSECOND_FORMATTER_LENGTH 9
#define FLB_AWS_MILLISECOND_FORMATTER "%3N"
#define FLB_AWS_NANOSECOND_FORMATTER_N "%9N"
#define FLB_AWS_NANOSECOND_FORMATTER_L "%L"

struct flb_http_client *request_do(struct flb_aws_client *aws_client,
int method, const char *uri,
const char *body, size_t body_len,
Expand Down Expand Up @@ -942,3 +948,94 @@ flb_sds_t flb_get_s3_key(const char *format, time_t time, const char *tag,
}
return NULL;
}

/*
* This function is an extension to strftime which can support milliseconds with %3N,
* support nanoseconds with %9N or %L. The return value is the length of formatted
* time string.
*/
size_t flb_aws_strftime_precision(char **out_buf, const char *time_format,
struct flb_time *tms)
{
char millisecond_str[FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1];
char nanosecond_str[FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1];
char *tmp_parsed_time_str;
char *buf;
size_t out_size;
size_t tmp_parsed_time_str_len;
size_t time_format_len;
struct tm timestamp;
struct tm *tmp;
int i;

/*
* Guess the max length needed for tmp_parsed_time_str and tmp_out_buf. The
* upper bound is 12*strlen(time_format) because the worst scenario will be only
* %c in time_format, and %c will be transfer to 24 chars long by function strftime().
*/
time_format_len = strlen(time_format);
tmp_parsed_time_str_len = 12*time_format_len;

/*
* Use tmp_parsed_time_str to buffer when replace %3N with milliseconds, replace
* %9N and %L with nanoseconds in time_format.
*/
tmp_parsed_time_str = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char));
if (!tmp_parsed_time_str) {
flb_errno();
return 0;
}

buf = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char));
if (!buf) {
flb_errno();
flb_free(tmp_parsed_time_str);
return 0;
}

/* Replace %3N to millisecond, %9N and %L to nanosecond in time_format. */
snprintf(millisecond_str, FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1,
"%" PRIu64, (uint64_t) tms->tm.tv_nsec / 1000000);
snprintf(nanosecond_str, FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1,
"%" PRIu64, (uint64_t) tms->tm.tv_nsec);
for (i = 0; i < time_format_len; i++) {
if (strncmp(time_format+i, FLB_AWS_MILLISECOND_FORMATTER, 3) == 0) {
strncat(tmp_parsed_time_str, millisecond_str,
FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1);
i += 2;
}
else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_N, 3) == 0) {
strncat(tmp_parsed_time_str, nanosecond_str,
FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1);
i += 2;
}
else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_L, 2) == 0) {
strncat(tmp_parsed_time_str, nanosecond_str,
FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1);
i += 1;
}
else {
strncat(tmp_parsed_time_str,time_format+i,1);
}
}

tmp = gmtime_r(&tms->tm.tv_sec, &timestamp);
if (!tmp) {
return 0;
}

out_size = strftime(buf, tmp_parsed_time_str_len,
tmp_parsed_time_str, &timestamp);

/* Check whether tmp_parsed_time_str_len is enough for tmp_out_buff */
if (out_size == 0) {
flb_free(tmp_parsed_time_str);
flb_free(buf);
return 0;
}

*out_buf = buf;
flb_free(tmp_parsed_time_str);

return out_size;
}