Skip to content

Commit

Permalink
out_s3: add Apache Arrow support
Browse files Browse the repository at this point in the history
Apache Arrow is an efficient columnar data format that is suitable
for statistical analysis, and popular in machine learning community.

    https://arrow.apache.org/

With this patch merged, users now can specify 'arrow' as the
compression type like this:

    [OUTPUT]
      Name s3
      Bucket some-bucket
      total_file_size 1M
      use_put_object On
      Compression arrow

which makes Fluent Bit convert the request buffer into Apache Arrow
format before uploading.

Signed-off-by: Fujimoto Seiji <[email protected]>
  • Loading branch information
Fujimoto Seiji authored and fujimotos committed Mar 23, 2021
1 parent 10a3b60 commit 03bcb43
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 13 deletions.
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ option(FLB_STATIC_CONF "Build binary using static configuration")
option(FLB_STREAM_PROCESSOR "Enable Stream Processor" Yes)
option(FLB_CORO_STACK_SIZE "Set coroutine stack size")
option(FLB_AVRO_ENCODER "Build with Avro encoding support" No)
option(FLB_ARROW "Build with Apache Arrow support" No)

# Metrics: Experimental Feature, disabled by default on 0.12 series
# but enabled in the upcoming 0.13 release. Note that development
Expand Down Expand Up @@ -672,6 +673,16 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND))
FLB_OPTION(FLB_OUT_PGSQL OFF)
endif()

# Arrow Glib
# ==========
find_package(PkgConfig)
pkg_check_modules(ARROW_GLIB QUIET arrow-glib)
if(FLB_ARROW AND ARROW_GLIB_FOUND)
FLB_DEFINITION(FLB_HAVE_ARROW)
else()
set(FLB_ARROW OFF)
endif()

# Pthread Local Storage
# =====================
# By default we expect the compiler already support thread local storage
Expand Down
5 changes: 5 additions & 0 deletions plugins/out_s3/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ set(src
s3_multipart.c)

FLB_PLUGIN(out_s3 "${src}" "")

if(FLB_ARROW)
add_subdirectory(arrow EXCLUDE_FROM_ALL)
target_link_libraries(flb-plugin-out_s3 out-s3-arrow)
endif()
7 changes: 7 additions & 0 deletions plugins/out_s3/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set(src
compress.c)

add_library(out-s3-arrow STATIC ${src})

target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS})
target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS})
174 changes: 174 additions & 0 deletions plugins/out_s3/arrow/compress.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* This converts S3 plugin's request buffer into Apache Arrow format.
*
* We use GLib binding to call Arrow functions (which is implemented
* in C++) from Fluent Bit.
*
* https://github.com/apache/arrow/tree/master/c_glib
*/

#include <arrow-glib/arrow-glib.h>
#include <inttypes.h>

/*
* GArrowTable is the central structure that represents "table" (a.k.a.
* matrix).
*/
static GArrowTable* parse_json(uint8_t *json, int size)
{
GArrowJSONReader *reader;
GArrowBuffer *buffer;
GArrowBufferInputStream *input;
GArrowJSONReadOptions *options;
GArrowTable *table;
GError *error = NULL;

buffer = garrow_buffer_new(json, size);
if (buffer == NULL) {
return NULL;
}

input = garrow_buffer_input_stream_new(buffer);
if (input == NULL) {
g_object_unref(buffer);
return NULL;
}

options = garrow_json_read_options_new();
if (options == NULL) {
g_print("fail to create json options: %s\n", error->message);
g_object_unref(buffer);
g_object_unref(input);
return NULL;
}

reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error);
if (reader == NULL) {
g_print("cannot create json reader: %s\n", error->message);
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
return NULL;
}

table = garrow_json_reader_read(reader, &error);
if (table == NULL) {
g_print("cannot parse json: %s\n", error->message);
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return NULL;
}
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return table;
}

static GArrowResizableBuffer* table_to_buffer(GArrowTable *table)
{
GArrowSchema *schema;
GArrowResizableBuffer *buffer;
GArrowBufferOutputStream *sink;
GArrowRecordBatchStreamWriter *writer;
GError *error = NULL;
gboolean success;

schema = garrow_table_get_schema(table);
if (schema == NULL) {
return NULL;
}

buffer = garrow_resizable_buffer_new(0, &error);
if (buffer == NULL) {
g_print("cannot create buffer: %s\n", error->message);
g_error_free(error);
g_object_unref(schema);
return NULL;
}

sink = garrow_buffer_output_stream_new(buffer);
if (sink == NULL) {
g_object_unref(schema);
g_object_unref(buffer);
return NULL;
}

writer = garrow_record_batch_stream_writer_new(
GARROW_OUTPUT_STREAM(sink),
schema, &error);
if (writer == NULL) {
g_print("cannot create writer: %s\n", error->message);
g_error_free(error);
g_object_unref(schema);
g_object_unref(buffer);
g_object_unref(sink);
return NULL;
}

success = garrow_record_batch_writer_write_table(
GARROW_RECORD_BATCH_WRITER(writer),
table, &error);
if (!success) {
g_print("cannot output stream: %s\n", error->message);
g_error_free(error);
g_object_unref(schema);
g_object_unref(buffer);
g_object_unref(sink);
g_object_unref(writer);
return NULL;
}
return buffer;
}

int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size)
{
GArrowTable *table;
GArrowResizableBuffer *buffer;
GBytes *bytes;
gconstpointer ptr;
gsize len;
uint8_t *buf;

table = parse_json(json, size);
if (table == NULL) {
return -1;
}

buffer = table_to_buffer(table);
g_object_unref(table);
if (buffer == NULL) {
return -1;
}

bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));
if (bytes == NULL) {
g_object_unref(buffer);
return -1;
}

ptr = g_bytes_get_data(bytes, &len);
if (ptr == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}

buf = malloc(len);
if (buf == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}
memcpy(buf, ptr, len);
*out_buf = (void *) buf;
*out_size = len;

g_object_unref(buffer);
g_bytes_unref(bytes);
return 0;
}
13 changes: 13 additions & 0 deletions plugins/out_s3/arrow/compress.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* This function converts out_s3 buffer into Apache Arrow format.
*
* `json` is a string that contain (concatenated) JSON objects.
*
* `size` is the length of the json data (excluding the trailing
* null-terminator character).
*
* Return 0 on success (with `out_buf` and `out_size` updated),
* and -1 on failure
*/

int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size);
47 changes: 35 additions & 12 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#include "s3.h"
#include "s3_store.h"

#ifdef FLB_HAVE_ARROW
#include "arrow/compress.h"
#endif

static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
struct s3_file *chunk,
char **out_buf, size_t *out_size);
Expand Down Expand Up @@ -113,7 +117,7 @@ static int create_headers(struct flb_s3 *ctx, struct flb_aws_header **headers, i
if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
headers_len++;
}
if (ctx->canned_acl != NULL) {
Expand All @@ -137,7 +141,7 @@ static int create_headers(struct flb_s3 *ctx, struct flb_aws_header **headers, i
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
s3_headers[n] = content_encoding_header;
n++;
}
Expand Down Expand Up @@ -497,17 +501,23 @@ static int cb_s3_init(struct flb_output_instance *ins,

tmp = flb_output_get_property("compression", ins);
if (tmp) {
if (strcmp((char *) tmp, "gzip") != 0) {
flb_plg_error(ctx->ins,
"'gzip' is currently the only supported value for 'compression'");
return -1;
} else if (ctx->use_put_object == FLB_FALSE) {
if (ctx->use_put_object == FLB_FALSE) {
flb_plg_error(ctx->ins,
"use_put_object must be enabled when compression is enabled");
return -1;
}

ctx->compression = (char *) tmp;
if (strcmp(tmp, "gzip") == 0) {
ctx->compression = COMPRESS_GZIP;
}
#ifdef FLB_HAVE_ARROW
else if (strcmp(tmp, "arrow") == 0) {
ctx->compression = COMPRESS_ARROW;
}
#endif
else {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
return -1;
}
}

tmp = flb_output_get_property("content_type", ins);
Expand Down Expand Up @@ -1071,15 +1081,27 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_sds_destroy(s3_key);
uri = tmp;

if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = (char *) compressed_body;
} else {
}
#ifdef FLB_HAVE_ARROW
else if (ctx->compression == COMPRESS_ARROW) {
ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = compressed_body;
}
#endif
else {
final_body = body;
final_body_size = body_size;
}
Expand Down Expand Up @@ -1550,7 +1572,8 @@ static struct flb_config_map config_map[] = {
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for S3 objects. 'gzip' is currently the only supported value. "
"The Content-Encoding HTTP Header will be set to 'gzip'."
"The Content-Encoding HTTP Header will be set to 'gzip'. "
"If Apache Arrow was enabled at compile time, you can set 'arrow' to this option."
},
{
FLB_CONFIG_MAP_STR, "content_type", NULL,
Expand Down
6 changes: 5 additions & 1 deletion plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@

#define DEFAULT_UPLOAD_TIMEOUT 3600

#define COMPRESS_NONE 0
#define COMPRESS_GZIP 1
#define COMPRESS_ARROW 2

/*
* If we see repeated errors on an upload/chunk, we will discard it
* This saves us from scenarios where something goes wrong and an upload can
Expand Down Expand Up @@ -95,10 +99,10 @@ struct flb_s3 {
char *endpoint;
char *sts_endpoint;
char *canned_acl;
char *compression;
char *content_type;
int free_endpoint;
int use_put_object;
int compression;

struct flb_aws_provider *provider;
struct flb_aws_provider *base_provider;
Expand Down

0 comments on commit 03bcb43

Please sign in to comment.