Skip to content

Commit

Permalink
parser: json: fix type confusion bug (fluent#3417)
Browse files Browse the repository at this point in the history
Signed-off-by: davkor <[email protected]>
  • Loading branch information
DavidKorczynski authored and Fujimoto Seiji committed Apr 27, 2021
1 parent 9c992e4 commit fdb7147
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 13 deletions.
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ option(FLB_STATIC_CONF "Build binary using static configuration")
option(FLB_STREAM_PROCESSOR "Enable Stream Processor" Yes)
option(FLB_CORO_STACK_SIZE "Set coroutine stack size")
option(FLB_AVRO_ENCODER "Build with Avro encoding support" No)
option(FLB_ARROW "Build with Apache Arrow support" No)

# Metrics: Experimental Feature, disabled by default on 0.12 series
# but enabled in the upcoming 0.13 release. Note that development
Expand Down Expand Up @@ -680,6 +681,16 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND))
FLB_OPTION(FLB_OUT_PGSQL OFF)
endif()

# Arrow GLib
# ==========
find_package(PkgConfig)
pkg_check_modules(ARROW_GLIB QUIET arrow-glib)
if(FLB_ARROW AND ARROW_GLIB_FOUND)
FLB_DEFINITION(FLB_HAVE_ARROW)
else()
set(FLB_ARROW OFF)
endif()

# Pthread Local Storage
# =====================
# By default we expect the compiler already support thread local storage
Expand Down
5 changes: 5 additions & 0 deletions plugins/out_s3/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ set(src
s3_multipart.c)

FLB_PLUGIN(out_s3 "${src}" "")

if(FLB_ARROW)
add_subdirectory(arrow EXCLUDE_FROM_ALL)
target_link_libraries(flb-plugin-out_s3 out-s3-arrow)
endif()
7 changes: 7 additions & 0 deletions plugins/out_s3/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set(src
compress.c)

add_library(out-s3-arrow STATIC ${src})

target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS})
target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS})
152 changes: 152 additions & 0 deletions plugins/out_s3/arrow/compress.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* This converts S3 plugin's request buffer into Apache Arrow format.
*
* We use GLib binding to call Arrow functions (which is implemented
* in C++) from Fluent Bit.
*
* https://github.com/apache/arrow/tree/master/c_glib
*/

#include <arrow-glib/arrow-glib.h>
#include <inttypes.h>

/*
* GArrowTable is the central structure that represents "table" (a.k.a.
* data frame).
*/
static GArrowTable* parse_json(uint8_t *json, int size)
{
GArrowJSONReader *reader;
GArrowBuffer *buffer;
GArrowBufferInputStream *input;
GArrowJSONReadOptions *options;
GArrowTable *table;
GError *error = NULL;

buffer = garrow_buffer_new(json, size);
if (buffer == NULL) {
return NULL;
}

input = garrow_buffer_input_stream_new(buffer);
if (input == NULL) {
g_object_unref(buffer);
return NULL;
}

options = garrow_json_read_options_new();
if (options == NULL) {
g_print("fail to create json options: %s\n", error->message);
g_object_unref(buffer);
g_object_unref(input);
return NULL;
}

reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error);
if (reader == NULL) {
g_print("cannot create json reader: %s\n", error->message);
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
return NULL;
}

table = garrow_json_reader_read(reader, &error);
if (table == NULL) {
g_print("cannot parse json: %s\n", error->message);
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return NULL;
}
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return table;
}

static GArrowResizableBuffer* table_to_buffer(GArrowTable *table)
{
GArrowResizableBuffer *buffer;
GArrowBufferOutputStream *sink;
GError *error = NULL;
gboolean success;

buffer = garrow_resizable_buffer_new(0, &error);
if (buffer == NULL) {
g_print("cannot create buffer: %s\n", error->message);
g_error_free(error);
return NULL;
}

sink = garrow_buffer_output_stream_new(buffer);
if (sink == NULL) {
g_object_unref(buffer);
return NULL;
}

success = garrow_table_write_as_feather(
table, GARROW_OUTPUT_STREAM(sink),
NULL, &error);
if (!success) {
g_print("cannot output table: %s\n", error->message);
g_error_free(error);
g_object_unref(buffer);
g_object_unref(sink);
return NULL;
}
g_object_unref(sink);
return buffer;
}

int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size)
{
GArrowTable *table;
GArrowResizableBuffer *buffer;
GBytes *bytes;
gconstpointer ptr;
gsize len;
uint8_t *buf;

table = parse_json(json, size);
if (table == NULL) {
return -1;
}

buffer = table_to_buffer(table);
g_object_unref(table);
if (buffer == NULL) {
return -1;
}

bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));
if (bytes == NULL) {
g_object_unref(buffer);
return -1;
}

ptr = g_bytes_get_data(bytes, &len);
if (ptr == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}

buf = malloc(len);
if (buf == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}
memcpy(buf, ptr, len);
*out_buf = (void *) buf;
*out_size = len;

g_object_unref(buffer);
g_bytes_unref(bytes);
return 0;
}
13 changes: 13 additions & 0 deletions plugins/out_s3/arrow/compress.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* This function converts out_s3 buffer into Apache Arrow format.
*
* `json` is a string that contain (concatenated) JSON objects.
*
* `size` is the length of the json data (excluding the trailing
* null-terminator character).
*
* Return 0 on success (with `out_buf` and `out_size` updated),
* and -1 on failure
*/

int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size);
47 changes: 35 additions & 12 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
#include "s3.h"
#include "s3_store.h"

#ifdef FLB_HAVE_ARROW
#include "arrow/compress.h"
#endif

static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
struct s3_file *chunk,
char **out_buf, size_t *out_size);
Expand Down Expand Up @@ -122,7 +126,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
headers_len++;
}
if (ctx->canned_acl != NULL) {
Expand All @@ -149,7 +153,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
s3_headers[n] = content_encoding_header;
n++;
}
Expand Down Expand Up @@ -515,17 +519,23 @@ static int cb_s3_init(struct flb_output_instance *ins,

tmp = flb_output_get_property("compression", ins);
if (tmp) {
if (strcmp((char *) tmp, "gzip") != 0) {
flb_plg_error(ctx->ins,
"'gzip' is currently the only supported value for 'compression'");
return -1;
} else if (ctx->use_put_object == FLB_FALSE) {
if (ctx->use_put_object == FLB_FALSE) {
flb_plg_error(ctx->ins,
"use_put_object must be enabled when compression is enabled");
return -1;
}

ctx->compression = (char *) tmp;
if (strcmp(tmp, "gzip") == 0) {
ctx->compression = COMPRESS_GZIP;
}
#ifdef FLB_HAVE_ARROW
else if (strcmp(tmp, "arrow") == 0) {
ctx->compression = COMPRESS_ARROW;
}
#endif
else {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
return -1;
}
}

tmp = flb_output_get_property("content_type", ins);
Expand Down Expand Up @@ -1090,15 +1100,27 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_sds_destroy(s3_key);
uri = tmp;

if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = (char *) compressed_body;
} else {
}
#ifdef FLB_HAVE_ARROW
else if (ctx->compression == COMPRESS_ARROW) {
ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = compressed_body;
}
#endif
else {
final_body = body;
final_body_size = body_size;
}
Expand Down Expand Up @@ -1602,7 +1624,8 @@ static struct flb_config_map config_map[] = {
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for S3 objects. 'gzip' is currently the only supported value. "
"The Content-Encoding HTTP Header will be set to 'gzip'."
"The Content-Encoding HTTP Header will be set to 'gzip'. "
"If Apache Arrow was enabled at compile time, you can set 'arrow' to this option."
},
{
FLB_CONFIG_MAP_STR, "content_type", NULL,
Expand Down
6 changes: 5 additions & 1 deletion plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@

#define DEFAULT_UPLOAD_TIMEOUT 3600

#define COMPRESS_NONE 0
#define COMPRESS_GZIP 1
#define COMPRESS_ARROW 2

/*
* If we see repeated errors on an upload/chunk, we will discard it
* This saves us from scenarios where something goes wrong and an upload can
Expand Down Expand Up @@ -95,11 +99,11 @@ struct flb_s3 {
char *endpoint;
char *sts_endpoint;
char *canned_acl;
char *compression;
char *content_type;
int free_endpoint;
int use_put_object;
int send_content_md5;
int compression;

struct flb_aws_provider *provider;
struct flb_aws_provider *base_provider;
Expand Down
6 changes: 6 additions & 0 deletions src/flb_parser_json.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ int flb_parser_json_do(struct flb_parser *parser,
return *out_size;
}

/* Ensure we have an accurate type */
if (v->type != MSGPACK_OBJECT_STR) {
msgpack_unpacked_destroy(&result);
return *out_size;
}

/* Lookup time */
ret = flb_parser_time_lookup(v->via.str.ptr, v->via.str.size,
0, parser, &tm, &tmfrac);
Expand Down

0 comments on commit fdb7147

Please sign in to comment.