Skip to content

Commit

Permalink
pack: added java_sql_timestamp format (fluent#4811)
Browse files Browse the repository at this point in the history
* pack: added java_sql_timestamp, a format string used by amazon athena

This commit adds a new timestamp format, the java_sql_timestamp,
which is very similar to iso8601, except that it has a space instead
of a T between the date and the hour and does not end with Z
(or any other timestamp delimiter)

This is the format: "%Y-%m-%d %H:%M:%S"

This is unfortunatelly the only format accepted by Amazon Athena.

Signed-off-by: Marcos Diez <[email protected]>
  • Loading branch information
marcosdiez authored Feb 17, 2022
1 parent 3e1be8b commit b02cb10
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 25 deletions.
14 changes: 7 additions & 7 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,10 @@ Config maps are an improvement to the previous Fluent Bit API that was used by p

There are various types of supported configuration types. Full list available [here](https://github.com/fluent/fluent-bit/blob/v1.4.2/include/fluent-bit/flb_config_map.h#L29). The most used ones are:

| Type | Description |
| -----------------------|:---------------------:|
| FLB_CONFIG_MAP_INT | Represents integer data type |
| FLB_CONFIG_MAP_BOOL | Represents boolean data type |
| Type | Description |
| -----------------------|:---------------------:|
| FLB_CONFIG_MAP_INT | Represents integer data type |
| FLB_CONFIG_MAP_BOOL | Represents boolean data type |
| FLB_CONFIG_MAP_DOUBLE | Represents a double |
| FLB_CONFIG_MAP_SIZE | Provides size_type as an integer datatype large enough to represent any possible string size. |
| FLB_CONFIG_MAP_STR | Represents string data type |
Expand All @@ -441,8 +441,8 @@ There are various types of supported configuration types. Full list available [h

A config map expects certain public fields at registration.

| Public Fields | Description |
| --------------|:---------------------|
| Public Fields | Description |
| --------------|:---------------------|
| Type | This field is the data type of the property that we are writing to the config map. If the property is of type `int` we use `FLB_CONFIG_MAP_INT`, if `string` `FLB_CONFIG_MAP_STR` etc. |
| Name | This field is the name of the configuration property. For example for the property flush count we use `flush_count`|
| Default Value | This field allows the user to set the default value of the property. For example, for a property of type `FLB_CONFIG_MAP_BOOL` (boolean), the default value may be false. Then we have to give `false` as default value. If there is no default value, `NULL` is given.|
Expand All @@ -469,7 +469,7 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "json_date_key", "date",
0, FLB_TRUE, offsetof(struct flb_stdout, json_date_key),
"Specifies the format of the date. Supported formats are double, iso8601 and epoch."
"Specifies the format of the date. Supported formats are double, iso8601, java_sql_timestamp and epoch."
},

/* EOF */
Expand Down
16 changes: 13 additions & 3 deletions include/fluent-bit/flb_pack.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@
#define FLB_PACK_JSON_PRIMITIVE JSMN_PRIMITIVE

/* Date formats */
#define FLB_PACK_JSON_DATE_DOUBLE 0
#define FLB_PACK_JSON_DATE_ISO8601 1
#define FLB_PACK_JSON_DATE_EPOCH 2
#define FLB_PACK_JSON_DATE_DOUBLE 0
#define FLB_PACK_JSON_DATE_ISO8601 1
#define FLB_PACK_JSON_DATE_EPOCH 2
#define FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP 3

/* Specific ISO8601 format */
#define FLB_PACK_JSON_DATE_ISO8601_FMT "%Y-%m-%dT%H:%M:%S"

/* Specific Java SQL Timestamp format */
#define FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP_FMT "%Y-%m-%d %H:%M:%S"

#define FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION "Specify the format of the date, " \
"supported formats: double, " \
"iso8601 (e.g: 2018-05-30T09:39:52.000681Z), " \
"java_sql_timestamp (e.g: 2018-05-30 09:39:52.000681, useful for AWS Athena), "\
"and epoch."

/* JSON formats (modes) */
#define FLB_PACK_JSON_FORMAT_NONE 0
#define FLB_PACK_JSON_FORMAT_JSON 1
Expand Down
2 changes: 1 addition & 1 deletion plugins/out_http/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "json_date_format", NULL,
0, FLB_FALSE, 0,
"Specify the format of the date. Supported formats are 'double' and 'iso8601'"
FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION
},
{
FLB_CONFIG_MAP_STR, "json_date_key", "date",
Expand Down
2 changes: 1 addition & 1 deletion plugins/out_null/null.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "json_date_key", "date",
0, FLB_TRUE, offsetof(struct flb_null, json_date_key),
"Specifies the format of the date. Supported formats are double, iso8601 and epoch."
FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION
},

/* EOF */
Expand Down
20 changes: 10 additions & 10 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
*headers = s3_headers;
return 0;
}

s3_headers = flb_malloc(sizeof(struct flb_aws_header) * headers_len);
if (s3_headers == NULL) {
flb_errno();
Expand Down Expand Up @@ -174,7 +174,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
s3_headers[n].val = body_md5;
s3_headers[n].val_len = strlen(body_md5);
}

*num_headers = headers_len;
*headers = s3_headers;
return 0;
Expand Down Expand Up @@ -374,7 +374,7 @@ static int init_seq_index(void *context) {
flb_plg_error(ctx->ins, "Failed to write to sequential index metadata file");
return -1;
}
}
}
else {
ret = read_seq_index(ctx->seq_index_file, &ctx->seq_index);
if (ret < 0) {
Expand Down Expand Up @@ -725,7 +725,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
tmp = flb_output_get_property("compression", ins);
if (tmp) {
if (ctx->use_put_object == FLB_FALSE) {
flb_plg_error(ctx->ins,
flb_plg_error(ctx->ins,
"use_put_object must be enabled when compression is enabled");
return -1;
}
Expand Down Expand Up @@ -1366,7 +1366,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
return -1;
}
}

s3_client = ctx->s3_client;
if (s3_plugin_under_test() == FLB_TRUE) {
c = mock_s3_call("TEST_PUT_OBJECT_ERROR", "PutObject");
Expand Down Expand Up @@ -1872,7 +1872,7 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char
}

msgpack_unpacked_init(&result);
while (!alloc_error &&
while (!alloc_error &&
msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
/* Each array must have two entries: time and record */
root = result.data;
Expand Down Expand Up @@ -1914,7 +1914,7 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char
if (strncmp(ctx->log_key, key_str, key_str_size) == 0) {
found = FLB_TRUE;

/*
/*
* Copy contents of value into buffer. Necessary to copy
* strings because flb_msgpack_to_json does not handle nested
* JSON gracefully and double escapes them.
Expand All @@ -1932,7 +1932,7 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char
val_offset++;
}
else {
ret = flb_msgpack_to_json(val_buf + val_offset,
ret = flb_msgpack_to_json(val_buf + val_offset,
msgpack_size - val_offset, &val);
if (ret < 0) {
break;
Expand All @@ -1955,7 +1955,7 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char

/* Throw error once per chunk if at least one log key was not found */
if (log_key_missing == FLB_TRUE) {
flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records",
flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records",
ctx->log_key, log_key_missing);
}

Expand Down Expand Up @@ -2241,7 +2241,7 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "json_date_format", NULL,
0, FLB_FALSE, 0,
"Specifies the format of the date. Supported formats are double, iso8601 and epoch."
FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION
},
{
FLB_CONFIG_MAP_STR, "json_date_key", "date",
Expand Down
2 changes: 1 addition & 1 deletion plugins/out_stdout/stdout.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "json_date_format", NULL,
0, FLB_FALSE, 0,
"Specifies the format of the date. Supported formats are double, iso8601 and epoch."
FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION
},
{
FLB_CONFIG_MAP_STR, "json_date_key", "date",
Expand Down
3 changes: 1 addition & 2 deletions plugins/out_tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "json_date_format", "double",
0, FLB_FALSE, 0,
"Specify the format of the date, supported formats: double, iso8601 "
"(e.g: 2018-05-30T09:39:52.000681Z) and epoch."
FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION
},

{
Expand Down
17 changes: 17 additions & 0 deletions src/flb_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ int flb_pack_to_json_date_type(const char *str)
if (strcasecmp(str, "double") == 0) {
return FLB_PACK_JSON_DATE_DOUBLE;
}
else if (strcasecmp(str, "java_sql_timestamp") == 0) {
return FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP;
}
else if (strcasecmp(str, "iso8601") == 0) {
return FLB_PACK_JSON_DATE_ISO8601;
}
Expand Down Expand Up @@ -908,6 +911,20 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes,
case FLB_PACK_JSON_DATE_DOUBLE:
msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms));
break;
case FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP:
/* Format the time, use microsecond precision not nanoseconds */
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(time_formatted, sizeof(time_formatted) - 1,
FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP_FMT, &tm);

len = snprintf(time_formatted + s,
sizeof(time_formatted) - 1 - s,
".%06" PRIu64,
(uint64_t) tms.tm.tv_nsec / 1000);
s += len;
msgpack_pack_str(&tmp_pck, s);
msgpack_pack_str_body(&tmp_pck, time_formatted, s);
break;
case FLB_PACK_JSON_DATE_ISO8601:
/* Format the time, use microsecond precision not nanoseconds */
gmtime_r(&tms.tm.tv_sec, &tm);
Expand Down

0 comments on commit b02cb10

Please sign in to comment.