From 03bcb430cc3309b6ecf7421f2031a3ffbdd1458a Mon Sep 17 00:00:00 2001
From: Fujimoto Seiji <fujimoto@clear-code.com>
Date: Mon, 8 Mar 2021 06:24:18 +0000
Subject: [PATCH] out_s3: add Apache Arrow support

Apache Arrow is an efficient columnar data format that is suitable
for statistical analysis, and popular in machine learning community.

    https://arrow.apache.org/

With this patch merged, users now can specify 'arrow' as the
compression type like this:

    [OUTPUT]
      Name s3
      Bucket some-bucket
      total_file_size 1M
      use_put_object On
      Compression arrow

which makes Fluent Bit convert the request buffer into Apache Arrow
format before uploading.

Signed-off-by: Fujimoto Seiji <fujimoto@clear-code.com>
---
 CMakeLists.txt                      |  11 ++
 plugins/out_s3/CMakeLists.txt       |   5 +
 plugins/out_s3/arrow/CMakeLists.txt |   7 ++
 plugins/out_s3/arrow/compress.c     | 174 ++++++++++++++++++++++++++++
 plugins/out_s3/arrow/compress.h     |  13 +++
 plugins/out_s3/s3.c                 |  47 ++++++--
 plugins/out_s3/s3.h                 |   6 +-
 7 files changed, 250 insertions(+), 13 deletions(-)
 create mode 100644 plugins/out_s3/arrow/CMakeLists.txt
 create mode 100644 plugins/out_s3/arrow/compress.c
 create mode 100644 plugins/out_s3/arrow/compress.h

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 472681751b1..fad8daa0e31 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -93,6 +93,7 @@ option(FLB_STATIC_CONF         "Build binary using static configuration")
 option(FLB_STREAM_PROCESSOR    "Enable Stream Processor"      Yes)
 option(FLB_CORO_STACK_SIZE     "Set coroutine stack size")
 option(FLB_AVRO_ENCODER        "Build with Avro encoding support" No)
+option(FLB_ARROW               "Build with Apache Arrow support"  No)
 
 # Metrics: Experimental Feature, disabled by default on 0.12 series
 # but enabled in the upcoming 0.13 release. Note that development
@@ -672,6 +673,16 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND))
    FLB_OPTION(FLB_OUT_PGSQL OFF)
 endif()
 
+# Arrow Glib
+# ==========
+find_package(PkgConfig)
+pkg_check_modules(ARROW_GLIB QUIET arrow-glib)
+if(FLB_ARROW AND ARROW_GLIB_FOUND)
+  FLB_DEFINITION(FLB_HAVE_ARROW)
+else()
+  set(FLB_ARROW OFF)
+endif()
+
 # Pthread Local Storage
 # =====================
 # By default we expect the compiler already support thread local storage
diff --git a/plugins/out_s3/CMakeLists.txt b/plugins/out_s3/CMakeLists.txt
index 94e04861707..2a3412b3682 100644
--- a/plugins/out_s3/CMakeLists.txt
+++ b/plugins/out_s3/CMakeLists.txt
@@ -4,3 +4,8 @@ set(src
   s3_multipart.c)
 
 FLB_PLUGIN(out_s3 "${src}" "")
+
+if(FLB_ARROW)
+  add_subdirectory(arrow EXCLUDE_FROM_ALL)
+  target_link_libraries(flb-plugin-out_s3 out-s3-arrow)
+endif()
diff --git a/plugins/out_s3/arrow/CMakeLists.txt b/plugins/out_s3/arrow/CMakeLists.txt
new file mode 100644
index 00000000000..36dedc714ca
--- /dev/null
+++ b/plugins/out_s3/arrow/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(src
+    compress.c)
+
+add_library(out-s3-arrow STATIC ${src})
+
+target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS})
+target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS})
diff --git a/plugins/out_s3/arrow/compress.c b/plugins/out_s3/arrow/compress.c
new file mode 100644
index 00000000000..0daf301f594
--- /dev/null
+++ b/plugins/out_s3/arrow/compress.c
@@ -0,0 +1,174 @@
+/*
+ * This converts S3 plugin's request buffer into Apache Arrow format.
+ *
+ * We use GLib binding to call Arrow functions (which is implemented
+ * in C++) from Fluent Bit.
+ *
+ * https://github.com/apache/arrow/tree/master/c_glib
+ */
+
+#include <arrow-glib/arrow-glib.h>
+#include <inttypes.h>
+
+/*
+ * GArrowTable is the central structure that represents "table" (a.k.a.
+ * matrix).
+ */
+static GArrowTable* parse_json(uint8_t *json, int size)
+{
+        GArrowJSONReader *reader;
+        GArrowBuffer *buffer;
+        GArrowBufferInputStream *input;
+        GArrowJSONReadOptions *options;
+        GArrowTable *table;
+        GError *error = NULL;
+
+        buffer = garrow_buffer_new(json, size);
+        if (buffer == NULL) {
+            return NULL;
+        }
+
+        input = garrow_buffer_input_stream_new(buffer);
+        if (input == NULL) {
+            g_object_unref(buffer);
+            return NULL;
+        }
+
+        options = garrow_json_read_options_new();
+        if (options == NULL) {
+            g_print("fail to create json options: %s\n", error->message);
+            g_object_unref(buffer);
+            g_object_unref(input);
+            return NULL;
+        }
+
+        reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error);
+        if (reader == NULL) {
+            g_print("cannot create json reader: %s\n", error->message);
+            g_error_free(error);
+            g_object_unref(buffer);
+            g_object_unref(input);
+            g_object_unref(options);
+            return NULL;
+        }
+
+        table = garrow_json_reader_read(reader, &error);
+        if (table == NULL) {
+            g_print("cannot parse json: %s\n", error->message);
+            g_error_free(error);
+            g_object_unref(buffer);
+            g_object_unref(input);
+            g_object_unref(options);
+            g_object_unref(reader);
+            return NULL;
+        }
+        g_object_unref(buffer);
+        g_object_unref(input);
+        g_object_unref(options);
+        g_object_unref(reader);
+        return table;
+}
+
+static GArrowResizableBuffer* table_to_buffer(GArrowTable *table)
+{
+        GArrowSchema *schema;
+        GArrowResizableBuffer *buffer;
+        GArrowBufferOutputStream *sink;
+        GArrowRecordBatchStreamWriter *writer;
+        GError *error = NULL;
+        gboolean success;
+
+        schema = garrow_table_get_schema(table);
+        if (schema == NULL) {
+            return NULL;
+        }
+
+        buffer = garrow_resizable_buffer_new(0, &error);
+        if (buffer == NULL) {
+            g_print("cannot create buffer: %s\n", error->message);
+            g_error_free(error);
+            g_object_unref(schema);
+            return NULL;
+        }
+
+        sink = garrow_buffer_output_stream_new(buffer);
+        if (sink == NULL) {
+            g_object_unref(schema);
+            g_object_unref(buffer);
+            return NULL;
+        }
+
+        writer = garrow_record_batch_stream_writer_new(
+                        GARROW_OUTPUT_STREAM(sink),
+                        schema, &error);
+        if (writer == NULL) {
+            g_print("cannot create writer: %s\n", error->message);
+            g_error_free(error);
+            g_object_unref(schema);
+            g_object_unref(buffer);
+            g_object_unref(sink);
+            return NULL;
+        }
+
+        success = garrow_record_batch_writer_write_table(
+                        GARROW_RECORD_BATCH_WRITER(writer),
+                        table, &error);
+        if (!success) {
+            g_print("cannot output stream: %s\n", error->message);
+            g_error_free(error);
+            g_object_unref(schema);
+            g_object_unref(buffer);
+            g_object_unref(sink);
+            g_object_unref(writer);
+            return NULL;
+        }
+        return buffer;
+}
+
+int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size)
+{
+        GArrowTable *table;
+        GArrowResizableBuffer *buffer;
+        GBytes *bytes;
+        gconstpointer ptr;
+        gsize len;
+        uint8_t *buf;
+
+        table = parse_json(json, size);
+        if (table == NULL) {
+            return -1;
+        }
+
+        buffer = table_to_buffer(table);
+        g_object_unref(table);
+        if (buffer == NULL) {
+            return -1;
+        }
+
+        bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));
+        if (bytes == NULL) {
+            g_object_unref(buffer);
+            return -1;
+        }
+
+        ptr = g_bytes_get_data(bytes, &len);
+        if (ptr == NULL) {
+            g_object_unref(buffer);
+            g_bytes_unref(bytes);
+            return -1;
+        }
+
+        buf = malloc(len);
+        if (buf == NULL) {
+            g_object_unref(buffer);
+            g_bytes_unref(bytes);
+            return -1;
+        }
+        memcpy(buf, ptr, len);
+        *out_buf = (void *) buf;
+        *out_size = len;
+
+        g_object_unref(buffer);
+        g_bytes_unref(bytes);
+        return 0;
+}
diff --git a/plugins/out_s3/arrow/compress.h b/plugins/out_s3/arrow/compress.h
new file mode 100644
index 00000000000..867d9ce02f3
--- /dev/null
+++ b/plugins/out_s3/arrow/compress.h
@@ -0,0 +1,13 @@
+/*
+ * This function converts out_s3 buffer into Apache Arrow format.
+ *
+ * `json` is a string that contain (concatenated) JSON objects.
+ *
+ * `size` is the length of the json data (excluding the trailing
+ * null-terminator character).
+ *
+ * Return 0 on success (with `out_buf` and `out_size` updated),
+ * and -1 on failure
+ */
+
+int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size);
diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c
index d182fa1a723..a4eb42311fc 100644
--- a/plugins/out_s3/s3.c
+++ b/plugins/out_s3/s3.c
@@ -34,6 +34,10 @@
 #include "s3.h"
 #include "s3_store.h"
 
+#ifdef FLB_HAVE_ARROW
+#include "arrow/compress.h"
+#endif
+
 static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
                                     struct s3_file *chunk,
                                     char **out_buf, size_t *out_size);
@@ -113,7 +117,7 @@ static int create_headers(struct flb_s3 *ctx, struct flb_aws_header **headers, i
     if (ctx->content_type != NULL) {
         headers_len++;
     }
-    if (ctx->compression != NULL) {
+    if (ctx->compression == COMPRESS_GZIP) {
         headers_len++;
     }
     if (ctx->canned_acl != NULL) {
@@ -137,7 +141,7 @@ static int create_headers(struct flb_s3 *ctx, struct flb_aws_header **headers, i
         s3_headers[n].val_len = strlen(ctx->content_type);
         n++;
     }
-    if (ctx->compression != NULL) {
+    if (ctx->compression == COMPRESS_GZIP) {
         s3_headers[n] = content_encoding_header;
         n++;
     }
@@ -497,17 +501,23 @@ static int cb_s3_init(struct flb_output_instance *ins,
 
     tmp = flb_output_get_property("compression", ins);
     if (tmp) {
-        if (strcmp((char *) tmp, "gzip") != 0) {
-            flb_plg_error(ctx->ins, 
-                          "'gzip' is currently the only supported value for 'compression'");
-            return -1;
-        } else if (ctx->use_put_object == FLB_FALSE) {
+        if (ctx->use_put_object == FLB_FALSE) {
             flb_plg_error(ctx->ins, 
                           "use_put_object must be enabled when compression is enabled");
             return -1;
         }
-        
-        ctx->compression = (char *) tmp;
+        if (strcmp(tmp, "gzip") == 0) {
+            ctx->compression = COMPRESS_GZIP;
+        }
+#ifdef FLB_HAVE_ARROW
+        else if (strcmp(tmp, "arrow") == 0) {
+            ctx->compression = COMPRESS_ARROW;
+        }
+#endif
+        else {
+            flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
+            return -1;
+        }
     }
 
     tmp = flb_output_get_property("content_type", ins);
@@ -1071,7 +1081,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
     flb_sds_destroy(s3_key);
     uri = tmp;
 
-    if (ctx->compression != NULL) {
+    if (ctx->compression == COMPRESS_GZIP) {
         ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size);
         if (ret == -1) {
             flb_plg_error(ctx->ins, "Failed to compress data");
@@ -1079,7 +1089,19 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
             return -1;
         }
         final_body = (char *) compressed_body;
-    } else {
+    }
+#ifdef FLB_HAVE_ARROW
+    else if (ctx->compression == COMPRESS_ARROW) {
+        ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size);
+        if (ret == -1) {
+            flb_plg_error(ctx->ins, "Failed to compress data");
+            flb_sds_destroy(uri);
+            return -1;
+        }
+        final_body = compressed_body;
+    }
+#endif
+    else {
         final_body = body;
         final_body_size = body_size;
     }
@@ -1550,7 +1572,8 @@ static struct flb_config_map config_map[] = {
      FLB_CONFIG_MAP_STR, "compression", NULL,
      0, FLB_FALSE, 0,
     "Compression type for S3 objects. 'gzip' is currently the only supported value. "
-    "The Content-Encoding HTTP Header will be set to 'gzip'."
+    "The Content-Encoding HTTP Header will be set to 'gzip'. "
+    "If Apache Arrow was enabled at compile time, you can set 'arrow' to this option."
     },
     {
      FLB_CONFIG_MAP_STR, "content_type", NULL,
diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h
index 6cb6c8ce7fc..d48ee710ca3 100644
--- a/plugins/out_s3/s3.h
+++ b/plugins/out_s3/s3.h
@@ -46,6 +46,10 @@
 
 #define DEFAULT_UPLOAD_TIMEOUT 3600
 
+#define COMPRESS_NONE  0
+#define COMPRESS_GZIP  1
+#define COMPRESS_ARROW 2
+
 /*
  * If we see repeated errors on an upload/chunk, we will discard it
  * This saves us from scenarios where something goes wrong and an upload can
@@ -95,10 +99,10 @@ struct flb_s3 {
     char *endpoint;
     char *sts_endpoint;
     char *canned_acl;
-    char *compression;
     char *content_type;
     int free_endpoint;
     int use_put_object;
+    int compression;
 
     struct flb_aws_provider *provider;
     struct flb_aws_provider *base_provider;