From a5bbe4cf596e9efd6556d06324acab85dd1af2c7 Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Tue, 27 Apr 2021 17:28:23 +0900 Subject: [PATCH 1/4] out_s3: add Apache Arrow support Apache Arrow is an efficient columnar data format that is suitable for statistical analysis, and popular in machine learning community. https://arrow.apache.org/ With this patch merged, users now can specify 'arrow' as the compression type like this: [OUTPUT] Name s3 Bucket some-bucket total_file_size 1M use_put_object On Compression arrow which makes Fluent Bit convert the request buffer into Apache Arrow format before uploading. Signed-off-by: Fujimoto Seiji Reviewed-by: Sutou Kouhei Reviewed-by: Wesley Pettit --- CMakeLists.txt | 12 ++- plugins/out_s3/CMakeLists.txt | 5 + plugins/out_s3/arrow/CMakeLists.txt | 7 ++ plugins/out_s3/arrow/compress.c | 147 ++++++++++++++++++++++++++++ plugins/out_s3/arrow/compress.h | 13 +++ plugins/out_s3/s3.c | 49 +++++++--- plugins/out_s3/s3.h | 6 +- 7 files changed, 224 insertions(+), 15 deletions(-) create mode 100644 plugins/out_s3/arrow/CMakeLists.txt create mode 100644 plugins/out_s3/arrow/compress.c create mode 100644 plugins/out_s3/arrow/compress.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f420a35194e..ac18354f1dc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -99,7 +99,7 @@ option(FLB_STREAM_PROCESSOR "Enable Stream Processor" Yes) option(FLB_CORO_STACK_SIZE "Set coroutine stack size") option(FLB_AVRO_ENCODER "Build with Avro encoding support" No) option(FLB_AWS_ERROR_REPORTER "Build with aws error reporting support" No) - +option(FLB_ARROW "Build with Apache Arrow support" No) # Native Metrics Support (cmetrics) option(FLB_METRICS "Enable metrics support" Yes) @@ -735,6 +735,16 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND)) FLB_OPTION(FLB_OUT_PGSQL OFF) endif() +# Arrow GLib +# ========== +find_package(PkgConfig) +pkg_check_modules(ARROW_GLIB QUIET arrow-glib) +if(FLB_ARROW AND ARROW_GLIB_FOUND) + FLB_DEFINITION(FLB_HAVE_ARROW) +else() + set(FLB_ARROW OFF) +endif() + # Pthread Local Storage # ===================== # By default we expect the compiler already support thread local storage diff --git a/plugins/out_s3/CMakeLists.txt b/plugins/out_s3/CMakeLists.txt index 94e04861707..2a3412b3682 100644 --- a/plugins/out_s3/CMakeLists.txt +++ b/plugins/out_s3/CMakeLists.txt @@ -4,3 +4,8 @@ set(src s3_multipart.c) FLB_PLUGIN(out_s3 "${src}" "") + +if(FLB_ARROW) + add_subdirectory(arrow EXCLUDE_FROM_ALL) + target_link_libraries(flb-plugin-out_s3 out-s3-arrow) +endif() diff --git a/plugins/out_s3/arrow/CMakeLists.txt b/plugins/out_s3/arrow/CMakeLists.txt new file mode 100644 index 00000000000..36dedc714ca --- /dev/null +++ b/plugins/out_s3/arrow/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + compress.c) + +add_library(out-s3-arrow STATIC ${src}) + +target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) +target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS}) diff --git a/plugins/out_s3/arrow/compress.c b/plugins/out_s3/arrow/compress.c new file mode 100644 index 00000000000..8a09aca1248 --- /dev/null +++ b/plugins/out_s3/arrow/compress.c @@ -0,0 +1,147 @@ +/* + * This converts S3 plugin's request buffer into Apache Arrow format. + * + * We use GLib binding to call Arrow functions (which is implemented + * in C++) from Fluent Bit. + * + * https://github.com/apache/arrow/tree/master/c_glib + */ + +#include +#include + +/* + * GArrowTable is the central structure that represents "table" (a.k.a. + * data frame). + */ +static GArrowTable* parse_json(uint8_t *json, int size) +{ + GArrowJSONReader *reader; + GArrowBuffer *buffer; + GArrowBufferInputStream *input; + GArrowJSONReadOptions *options; + GArrowTable *table; + GError *error = NULL; + + buffer = garrow_buffer_new(json, size); + if (buffer == NULL) { + return NULL; + } + + input = garrow_buffer_input_stream_new(buffer); + if (input == NULL) { + g_object_unref(buffer); + return NULL; + } + + options = garrow_json_read_options_new(); + if (options == NULL) { + g_object_unref(buffer); + g_object_unref(input); + return NULL; + } + + reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); + if (reader == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + return NULL; + } + + table = garrow_json_reader_read(reader, &error); + if (table == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return NULL; + } + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return table; +} + +static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) +{ + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GError *error = NULL; + gboolean success; + + buffer = garrow_resizable_buffer_new(0, &error); + if (buffer == NULL) { + g_error_free(error); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (sink == NULL) { + g_object_unref(buffer); + return NULL; + } + + success = garrow_table_write_as_feather( + table, GARROW_OUTPUT_STREAM(sink), + NULL, &error); + if (!success) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + g_object_unref(sink); + return buffer; +} + +int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size) +{ + GArrowTable *table; + GArrowResizableBuffer *buffer; + GBytes *bytes; + gconstpointer ptr; + gsize len; + uint8_t *buf; + + table = parse_json(json, size); + if (table == NULL) { + return -1; + } + + buffer = table_to_buffer(table); + g_object_unref(table); + if (buffer == NULL) { + return -1; + } + + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); + if (bytes == NULL) { + g_object_unref(buffer); + return -1; + } + + ptr = g_bytes_get_data(bytes, &len); + if (ptr == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + + buf = malloc(len); + if (buf == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + memcpy(buf, ptr, len); + *out_buf = (void *) buf; + *out_size = len; + + g_object_unref(buffer); + g_bytes_unref(bytes); + return 0; +} diff --git a/plugins/out_s3/arrow/compress.h b/plugins/out_s3/arrow/compress.h new file mode 100644 index 00000000000..867d9ce02f3 --- /dev/null +++ b/plugins/out_s3/arrow/compress.h @@ -0,0 +1,13 @@ +/* + * This function converts out_s3 buffer into Apache Arrow format. + * + * `json` is a string that contain (concatenated) JSON objects. + * + * `size` is the length of the json data (excluding the trailing + * null-terminator character). + * + * Return 0 on success (with `out_buf` and `out_size` updated), + * and -1 on failure + */ + +int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size); diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 94a76306cff..4eef6b7a566 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -37,6 +37,10 @@ #include "s3.h" #include "s3_store.h" +#ifdef FLB_HAVE_ARROW +#include "arrow/compress.h" +#endif + static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, struct s3_file *chunk, char **out_buf, size_t *out_size); @@ -125,7 +129,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression != NULL) { + if (ctx->compression == COMPRESS_GZIP) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -152,7 +156,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression != NULL) { + if (ctx->compression == COMPRESS_GZIP) { s3_headers[n] = content_encoding_header; n++; } @@ -678,17 +682,23 @@ static int cb_s3_init(struct flb_output_instance *ins, tmp = flb_output_get_property("compression", ins); if (tmp) { - if (strcmp((char *) tmp, "gzip") != 0) { - flb_plg_error(ctx->ins, - "'gzip' is currently the only supported value for 'compression'"); - return -1; - } else if (ctx->use_put_object == FLB_FALSE) { + if (ctx->use_put_object == FLB_FALSE) { flb_plg_error(ctx->ins, "use_put_object must be enabled when compression is enabled"); return -1; } - - ctx->compression = (char *) tmp; + if (strcmp(tmp, "gzip") == 0) { + ctx->compression = COMPRESS_GZIP; + } +#ifdef FLB_HAVE_ARROW + else if (strcmp(tmp, "arrow") == 0) { + ctx->compression = COMPRESS_ARROW; + } +#endif + else { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + return -1; + } } tmp = flb_output_get_property("content_type", ins); @@ -1262,7 +1272,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_sds_destroy(s3_key); uri = tmp; - if (ctx->compression != NULL) { + if (ctx->compression == COMPRESS_GZIP) { ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); @@ -1270,7 +1280,19 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time return -1; } final_body = (char *) compressed_body; - } else { + } +#ifdef FLB_HAVE_ARROW + else if (ctx->compression == COMPRESS_ARROW) { + ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data"); + flb_sds_destroy(uri); + return -1; + } + final_body = compressed_body; + } +#endif + else { final_body = body; final_body_size = body_size; } @@ -1313,7 +1335,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, uri, final_body, final_body_size, headers, num_headers); - if (ctx->compression != NULL) { + if (ctx->compression != COMPRESS_NONE) { flb_free(compressed_body); } flb_free(headers); @@ -2226,7 +2248,8 @@ static struct flb_config_map config_map[] = { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, "Compression type for S3 objects. 'gzip' is currently the only supported value. " - "The Content-Encoding HTTP Header will be set to 'gzip'." + "The Content-Encoding HTTP Header will be set to 'gzip'. " + "If Apache Arrow was enabled at compile time, you can set 'arrow' to this option." }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 665567f9645..47a8eed1157 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -46,6 +46,10 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 +#define COMPRESS_NONE 0 +#define COMPRESS_GZIP 1 +#define COMPRESS_ARROW 2 + /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can @@ -107,7 +111,6 @@ struct flb_s3 { char *endpoint; char *sts_endpoint; char *canned_acl; - char *compression; char *content_type; char *log_key; int free_endpoint; @@ -115,6 +118,7 @@ struct flb_s3 { int use_put_object; int send_content_md5; int static_file_path; + int compression; struct flb_aws_provider *provider; struct flb_aws_provider *base_provider; From f9895b3cb7e8e88a93a2852170ad0429c1479692 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Mon, 6 Dec 2021 19:57:48 +0000 Subject: [PATCH 2/4] aws: create shared compression utility restructure aws cmake to make maintaining nested directories easier Signed-off-by: Matthew Fala --- include/fluent-bit/aws/flb_aws_compress.h | 63 +++ src/CMakeLists.txt | 39 +- src/aws/CMakeLists.txt | 30 ++ src/aws/compression/CMakeLists.txt | 6 + src/aws/compression/arrow/CMakeLists.txt | 7 + src/aws/compression/arrow/compress.c | 147 +++++++ src/aws/compression/arrow/compress.h | 13 + src/aws/flb_aws_compress.c | 231 ++++++++++ tests/internal/CMakeLists.txt | 5 + tests/internal/aws_compress.c | 490 ++++++++++++++++++++++ 10 files changed, 1004 insertions(+), 27 deletions(-) create mode 100644 include/fluent-bit/aws/flb_aws_compress.h create mode 100644 src/aws/CMakeLists.txt create mode 100644 src/aws/compression/CMakeLists.txt create mode 100644 src/aws/compression/arrow/CMakeLists.txt create mode 100644 src/aws/compression/arrow/compress.c create mode 100644 src/aws/compression/arrow/compress.h create mode 100644 src/aws/flb_aws_compress.c create mode 100644 tests/internal/aws_compress.c diff --git a/include/fluent-bit/aws/flb_aws_compress.h b/include/fluent-bit/aws/flb_aws_compress.h new file mode 100644 index 00000000000..e1cf9222377 --- /dev/null +++ b/include/fluent-bit/aws/flb_aws_compress.h @@ -0,0 +1,63 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2021 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_AWS_COMPRESS +#define FLB_AWS_COMPRESS + +#include +#define FLB_AWS_COMPRESS_NONE 0 +#define FLB_AWS_COMPRESS_GZIP 1 +#define FLB_AWS_COMPRESS_ARROW 2 + +/* + * Get compression type from compression keyword. The return value is used to identify + * what compression option to utilize. + * + * Returns int compression type id - FLB_AWS_COMPRESS_ + */ +int flb_aws_compression_get_type(const char *compression_keyword); + +/* + * Compress in_data and write result to newly allocated out_data buf + * Client is responsable for freeing out_data. + * + * Returns -1 on error + * Returns 0 on success + */ +int flb_aws_compression_compress(int compression_type, void *in_data, size_t in_len, + void **out_data, size_t *out_len); + +/* + * Truncate and compress in_data and convert to b64 + * If b64 output data is larger than max_out_len, the input is truncated with a + * [Truncated...] suffix appended to the end, and recompressed. The result is written to a + * newly allocated out_data buf. + * Client is responsable for freeing out_data. + * + * out_len and max_out_len do not count the null character as a part of out_data's length, + * though the null character may be included at the end of out_data. + * + * Returns -1 on error + * Returns 0 on success + */ +int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_out_len, + void *in_data, size_t in_len, + void **out_data, size_t *out_len); + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fb3ed7828f1..4fedcfba4f5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -137,33 +137,6 @@ if(FLB_HTTP_CLIENT_DEBUG) ) endif() -if(FLB_AWS) - set(src - ${src} - "aws/flb_aws_credentials_log.h" - "aws/flb_aws_util.c" - "aws/flb_aws_credentials.c" - "aws/flb_aws_credentials_sts.c" - "aws/flb_aws_credentials_ec2.c" - "aws/flb_aws_imds.c" - "aws/flb_aws_credentials_http.c" - "aws/flb_aws_credentials_profile.c" - ) - if(FLB_HAVE_AWS_CREDENTIAL_PROCESS) - set(src - ${src} - "aws/flb_aws_credentials_process.c" - ) - endif() -endif() - -if (FLB_AWS_ERROR_REPORTER) - set(src - ${src} - "aws/flb_aws_error_reporter.c" - ) -endif() - if(FLB_LUAJIT) set(src ${src} @@ -246,6 +219,10 @@ if(CMAKE_SYSTEM_NAME MATCHES "Linux") ) endif() +# AWS specific +if(FLB_AWS) + add_subdirectory(aws) +endif() # Record Accessor # --------------- @@ -334,6 +311,14 @@ set(FLB_DEPS ) endif() +# AWS specific +if(FLB_AWS) + set(FLB_DEPS + ${FLB_DEPS} + flb-aws + ) +endif() + # Record Accessor if(FLB_RECORD_ACCESSOR) set(FLB_DEPS diff --git a/src/aws/CMakeLists.txt b/src/aws/CMakeLists.txt new file mode 100644 index 00000000000..dad5dd9e3d6 --- /dev/null +++ b/src/aws/CMakeLists.txt @@ -0,0 +1,30 @@ +add_subdirectory(compression) + +set(src + "flb_aws_credentials_log.h" + "flb_aws_compress.c" + "flb_aws_util.c" + "flb_aws_credentials.c" + "flb_aws_credentials_sts.c" + "flb_aws_credentials_ec2.c" + "flb_aws_imds.c" + "flb_aws_credentials_http.c" + "flb_aws_credentials_profile.c" + ) + +if(FLB_HAVE_AWS_CREDENTIAL_PROCESS) + set(src + ${src} + "flb_aws_credentials_process.c" + ) +endif() + +if (FLB_AWS_ERROR_REPORTER) + set(src + ${src} + "flb_aws_error_reporter.c" + ) +endif() + +add_library(flb-aws STATIC ${src}) +target_link_libraries(flb-aws flb-aws-compress) diff --git a/src/aws/compression/CMakeLists.txt b/src/aws/compression/CMakeLists.txt new file mode 100644 index 00000000000..afeab659f86 --- /dev/null +++ b/src/aws/compression/CMakeLists.txt @@ -0,0 +1,6 @@ +add_library(flb-aws-compress INTERFACE) + +if(FLB_ARROW) + add_subdirectory(arrow EXCLUDE_FROM_ALL) + target_link_libraries(flb-aws-compress flb-aws-arrow) +endif() diff --git a/src/aws/compression/arrow/CMakeLists.txt b/src/aws/compression/arrow/CMakeLists.txt new file mode 100644 index 00000000000..846f654412d --- /dev/null +++ b/src/aws/compression/arrow/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + compress.c) + +add_library(flb-aws-arrow STATIC ${src}) + +target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS}) diff --git a/src/aws/compression/arrow/compress.c b/src/aws/compression/arrow/compress.c new file mode 100644 index 00000000000..8a09aca1248 --- /dev/null +++ b/src/aws/compression/arrow/compress.c @@ -0,0 +1,147 @@ +/* + * This converts S3 plugin's request buffer into Apache Arrow format. + * + * We use GLib binding to call Arrow functions (which is implemented + * in C++) from Fluent Bit. + * + * https://github.com/apache/arrow/tree/master/c_glib + */ + +#include +#include + +/* + * GArrowTable is the central structure that represents "table" (a.k.a. + * data frame). + */ +static GArrowTable* parse_json(uint8_t *json, int size) +{ + GArrowJSONReader *reader; + GArrowBuffer *buffer; + GArrowBufferInputStream *input; + GArrowJSONReadOptions *options; + GArrowTable *table; + GError *error = NULL; + + buffer = garrow_buffer_new(json, size); + if (buffer == NULL) { + return NULL; + } + + input = garrow_buffer_input_stream_new(buffer); + if (input == NULL) { + g_object_unref(buffer); + return NULL; + } + + options = garrow_json_read_options_new(); + if (options == NULL) { + g_object_unref(buffer); + g_object_unref(input); + return NULL; + } + + reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); + if (reader == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + return NULL; + } + + table = garrow_json_reader_read(reader, &error); + if (table == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return NULL; + } + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return table; +} + +static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) +{ + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GError *error = NULL; + gboolean success; + + buffer = garrow_resizable_buffer_new(0, &error); + if (buffer == NULL) { + g_error_free(error); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (sink == NULL) { + g_object_unref(buffer); + return NULL; + } + + success = garrow_table_write_as_feather( + table, GARROW_OUTPUT_STREAM(sink), + NULL, &error); + if (!success) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + g_object_unref(sink); + return buffer; +} + +int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size) +{ + GArrowTable *table; + GArrowResizableBuffer *buffer; + GBytes *bytes; + gconstpointer ptr; + gsize len; + uint8_t *buf; + + table = parse_json(json, size); + if (table == NULL) { + return -1; + } + + buffer = table_to_buffer(table); + g_object_unref(table); + if (buffer == NULL) { + return -1; + } + + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); + if (bytes == NULL) { + g_object_unref(buffer); + return -1; + } + + ptr = g_bytes_get_data(bytes, &len); + if (ptr == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + + buf = malloc(len); + if (buf == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + memcpy(buf, ptr, len); + *out_buf = (void *) buf; + *out_size = len; + + g_object_unref(buffer); + g_bytes_unref(bytes); + return 0; +} diff --git a/src/aws/compression/arrow/compress.h b/src/aws/compression/arrow/compress.h new file mode 100644 index 00000000000..867d9ce02f3 --- /dev/null +++ b/src/aws/compression/arrow/compress.h @@ -0,0 +1,13 @@ +/* + * This function converts out_s3 buffer into Apache Arrow format. + * + * `json` is a string that contain (concatenated) JSON objects. + * + * `size` is the length of the json data (excluding the trailing + * null-terminator character). + * + * Return 0 on success (with `out_buf` and `out_size` updated), + * and -1 on failure + */ + +int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size); diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c new file mode 100644 index 00000000000..83ea6bbb8d8 --- /dev/null +++ b/src/aws/flb_aws_compress.c @@ -0,0 +1,231 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2021 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include + +#include + +#ifdef FLB_HAVE_ARROW +#include "compression/arrow/compress.h" +#endif + +struct compression_option { + int compression_type; + char *compression_keyword; + int(*compress)(void *in_data, size_t in_len, void **out_data, size_t *out_len); +}; + +/* + * Library of compression options + * AWS plugins that support compression will have these options. + * Referenced function should return -1 on error and 0 on success. + */ +static const struct compression_option compression_options[] = { + /* FLB_AWS_COMPRESS_NONE which is 0 is reserved for array footer */ + { + FLB_AWS_COMPRESS_GZIP, + "gzip", + &flb_gzip_compress + }, +#ifdef FLB_HAVE_ARROW + { + FLB_AWS_COMPRESS_ARROW, + "arrow", + &out_s3_compress_arrow + }, +#endif + { 0 } +}; + +int flb_aws_compression_get_type(const char *compression_keyword) +{ + int ret; + const struct compression_option *o; + + o = compression_options; + + while (o->compression_type != 0) { + ret = strcmp(o->compression_keyword, compression_keyword); + if (ret == 0) { + return o->compression_type; + } + ++o; + } + + flb_error("[aws_compress] unknown compression type: %s", compression_keyword); + return -1; +} + +int flb_aws_compression_compress(int compression_type, void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + const struct compression_option *o; + + o = compression_options; + + while (o->compression_type != 0) { + if (o->compression_type == compression_type) { + return o->compress(in_data, in_len, out_data, out_len); + } + ++o; + } + + flb_error("[aws_compress] invalid compression type: %i", compression_type); + flb_errno(); + return -1; +} + +int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_out_len, + void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + static const void *truncation_suffix = "[Truncated...]"; + static const size_t truncation_suffix_len = 14; + static const double truncation_reduction_percent = 90; /* % out of 100 */ + + int ret; + int is_truncated; + size_t truncated_in_len_prev; + size_t truncated_in_len; + void *truncated_in_buf; + void *compressed_buf; + size_t compressed_len; + size_t original_b64_compressed_len; + + unsigned char *b64_compressed_buf; + size_t b64_compressed_len; + size_t b64_actual_len; + + /* Iterative approach to truncation */ + truncated_in_len = in_len; + truncated_in_buf = in_data; + is_truncated = FLB_FALSE; + b64_compressed_len = SIZE_MAX; + while (max_out_len < b64_compressed_len - 1) { + ret = flb_aws_compression_compress(compression_type, truncated_in_buf, + truncated_in_len, &compressed_buf, + &compressed_len); + if (ret != 0) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + return -1; + } + + /* Determine encoded base64 buffer size */ + b64_compressed_len = compressed_len / 3; /* Compute number of 4 sextet groups */ + b64_compressed_len += (compressed_len % 3 != 0); /* Add padding partial group */ + b64_compressed_len *= 4; /* Compute number of sextets */ + b64_compressed_len += 1; /* Add room for null character 0x00 */ + + /* Truncation needed */ + if (max_out_len < b64_compressed_len - 1) { + flb_debug("[aws_compress] iterative truncation round"); + + /* This compressed_buf is the wrong size. Free */ + flb_free(compressed_buf); + + /* Base case: input compressed empty string, output still too large */ + if (truncated_in_len == 0) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + flb_error("[aws_compress] truncation failed, compressed empty input too " + "large"); + return -1; + } + + /* Calculate corrected input size */ + truncated_in_len_prev = truncated_in_len; + truncated_in_len = (max_out_len * truncated_in_len) / b64_compressed_len; + truncated_in_len = (truncated_in_len * truncation_reduction_percent) / 100; + + /* Ensure working down */ + if (truncated_in_len >= truncated_in_len_prev) { + truncated_in_len = truncated_in_len_prev - 1; + } + + /* Allocate truncation buffer */ + if (!is_truncated) { + is_truncated = FLB_TRUE; + original_b64_compressed_len = b64_compressed_len; + truncated_in_buf = flb_malloc(in_len); + if (!truncated_in_buf) { + flb_errno(); + return -1; + } + memcpy(truncated_in_buf, in_data, in_len); + } + + /* Slap on truncation suffix */ + if (truncated_in_len < truncation_suffix_len) { + /* No room for the truncation suffix. Terminal error */ + flb_error("[aws_compress] truncation failed, no room for suffix"); + flb_free(truncated_in_buf); + return -1; + } + memcpy(truncated_in_buf + truncated_in_len - truncation_suffix_len, + truncation_suffix, truncation_suffix_len); + } + } + + /* Truncate buffer free and compression buffer allocation */ + if (is_truncated) { + flb_free(truncated_in_buf); + flb_warn("[aws_compress][size=%zu] Truncating input for compressed output " + "larger than %zu bytes, output from %zu to %zu bytes", + in_len, max_out_len, original_b64_compressed_len - 1, + b64_compressed_len - 1); + } + b64_compressed_buf = flb_malloc(b64_compressed_len); + if (!b64_compressed_buf) { + flb_errno(); + return -1; + } + + /* Base64 encode compressed out bytes */ + ret = mbedtls_base64_encode(b64_compressed_buf, b64_compressed_len, &b64_actual_len, + compressed_buf, compressed_len); + flb_free(compressed_buf); + + if (ret == MBEDTLS_ERR_BASE64_BUFFER_TOO_SMALL) { + flb_error("[aws_compress] compressed log base64 buffer too small"); + return -1; /* not handle truncation at this point */ + } + if (ret != 0) { + flb_free(b64_compressed_buf); + return -1; + } + + /* Double check b64 buf len */ + if (b64_compressed_len - 1 != b64_actual_len) { + flb_error("[aws_compress] buffer len should be 1 greater than actual len"); + flb_free(b64_compressed_buf); + return -1; + } + + *out_data = b64_compressed_buf; + *out_len = b64_compressed_len - 1; /* disregard added null character */ + return 0; +} diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 7931227ac1b..655340f106a 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -77,6 +77,7 @@ if(FLB_AWS) set(UNIT_TESTS_FILES ${UNIT_TESTS_FILES} aws_util.c + aws_compress.c aws_credentials.c aws_credentials_ec2.c aws_credentials_sts.c @@ -141,6 +142,10 @@ foreach(source_file ${UNIT_TESTS_FILES}) target_link_libraries(${source_file_we} ${CMAKE_THREAD_LIBS_INIT}) endif() + if(FLB_AWS) + target_link_libraries(${source_file_we} flb-aws) + endif() + if(FLB_STREAM_PROCESSOR) target_link_libraries(${source_file_we} flb-sp) endif() diff --git a/tests/internal/aws_compress.c b/tests/internal/aws_compress.c new file mode 100644 index 00000000000..78a9f23d5c2 --- /dev/null +++ b/tests/internal/aws_compress.c @@ -0,0 +1,490 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include + +#include +#include "flb_tests_internal.h" + +#define FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS 1 +#define FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE 2 + +/* test case definition struct */ +struct flb_aws_test_case { + char* compression_keyword; + char* in_data; + char* expect_out_data_b64; + int expect_ret; +}; + +/* test loop function declarations */ +static unsigned char * base64_encode(const unsigned char *src, size_t len, + size_t *out_len); +static unsigned char * base64_decode(const unsigned char *src, size_t len, + size_t *out_len); +static void flb_aws_compress_general_test_cases(int test_type, + struct flb_aws_test_case *cases, + size_t max_out_len, + int(*decompress)(void *in_data, + size_t in_len, + void **out_data, + size_t *out_len)); +static void flb_aws_compress_test_cases(struct flb_aws_test_case *cases); +static void flb_aws_compress_truncate_b64_test_cases__gzip_decode( + struct flb_aws_test_case *cases, + size_t max_out_len); + +/** ------ Test Cases ------ **/ +void test_compression_gzip() +{ + struct flb_aws_test_case cases[] = + { + { + "gzip", + "hello hello hello hello hello hello", + "H4sIAAAAAAAA/8tIzcnJV8jARwIAVzdihSMAAAA=", + 0 + }, + { 0 } + }; + + flb_aws_compress_test_cases(cases); +} + +void test_b64_truncated_gzip() +{ +struct flb_aws_test_case cases[] = + { + { + "gzip", + "hello hello hello hello hello hello", + "hello hello hello hello hello hello", /* Auto decoded via gzip */ + 0 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 41); +} + +void test_b64_truncated_gzip_truncation() +{ +struct flb_aws_test_case cases[] = + { + { + "gzip", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum. xyz", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, su[Truncated...]" + /*"nt in culpa qui officia deserunt mollit anim id est laborum. xyz",*/ + "", + 0 /* Expected ret */ + }, + { + "gzip", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum.", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum.", + 0 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 381); +} + +void test_b64_truncated_gzip_truncation_buffer_too_small() +{ +struct flb_aws_test_case cases[] = + { + { + "gzip", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum.", + "", + -1 /* Expected ret */ + }, + { + "gzip", + "", + "", + -1 /* Expected ret: Buffer too small */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 14); +} + +void test_b64_truncated_gzip_truncation_edge() +{ +struct flb_aws_test_case cases[] = + { + /*{ + "gzip", + "", + "", + 0 + }, *//* This test case fails, because GZIP can zip empty strings but not unzip */ + { + "gzip", + "[Truncated...]", /* Endless loop? */ + "", + -1 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 51); +} + +void test_b64_truncated_gzip_truncation_multi_rounds() +{ +struct flb_aws_test_case cases[] = + { + { + "gzip", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum." + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "", /* First half of the compression is heavy, the second half is light. */ + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit es" + "[Truncated...]", /* Bad estimation of resizing, 3 truncation iterations + * needed */ + 0 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 300); +} + +TEST_LIST = { + { "test_compression_gzip", test_compression_gzip }, + { "test_b64_truncated_gzip", test_b64_truncated_gzip }, + { "test_b64_truncated_gzip_truncation", test_b64_truncated_gzip_truncation }, + { "test_b64_truncated_gzip_truncation_buffer_too_small", + test_b64_truncated_gzip_truncation_buffer_too_small }, + { "test_b64_truncated_gzip_truncation_edge", + test_b64_truncated_gzip_truncation_edge }, + { "test_b64_truncated_gzip_truncation_multi_rounds", + test_b64_truncated_gzip_truncation_multi_rounds }, + { 0 } +}; + +/** ------ Helper Methods ------ **/ + +/* test case loop for flb_aws_compress */ +static void flb_aws_compress_test_cases(struct flb_aws_test_case *cases) +{ + flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS, + cases, 0, NULL); +} + +/* test case loop for flb_aws_compress */ +static void flb_aws_compress_truncate_b64_test_cases__gzip_decode( + struct flb_aws_test_case *cases, + size_t max_out_len) +{ + flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE, + cases, max_out_len, &flb_gzip_uncompress); +} + +/* General test case loop flb_aws_compress */ +static void flb_aws_compress_general_test_cases(int test_type, + struct flb_aws_test_case *cases, + size_t max_out_len, + int(*decompress)(void *in_data, + size_t in_len, + void **out_data, + size_t *out_len)) +{ + int ret; + size_t len; + int compression_type = FLB_AWS_COMPRESS_NONE; + unsigned char* out_data; + size_t out_data_len; + unsigned char* out_data_b64; + size_t out_data_b64_len; + + struct flb_aws_test_case *tcase = cases; + while (tcase->compression_keyword != 0) { + + size_t in_data_len = strlen(tcase->in_data); + compression_type = flb_aws_compression_get_type(tcase->compression_keyword); + + TEST_CHECK(compression_type != -1); + TEST_MSG("| flb_aws_get_compression_type: failed to get compression type for " + "keyword " + "%s", tcase->compression_keyword); + + if (test_type == FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS) { + ret = flb_aws_compression_compress(compression_type, (void *) tcase->in_data, + in_data_len, (void **) &out_data, + &out_data_len); + } + else { + ret = flb_aws_compression_b64_truncate_compress(compression_type, max_out_len, + (void *) tcase->in_data, + in_data_len, + (void **) &out_data, + &out_data_len); + } + + TEST_CHECK(ret == tcase->expect_ret); + TEST_MSG("| Expected return value: %i", tcase->expect_ret); + TEST_MSG("| Produced return value: %i", ret); + + if (ret != 0) { + TEST_MSG("*- For input data: %s", tcase->in_data); + ++tcase; + continue; + } + + if (test_type == FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS) { + out_data_b64 = base64_encode(out_data, out_data_len, &out_data_b64_len); + /* remove newline character which is a part of this encode algo */ + --out_data_b64_len; + flb_free(out_data); + out_data = NULL; + } + else { + /* decode b64 so we can compare plain text */ + out_data_b64 = base64_decode(out_data, out_data_len, &out_data_b64_len); + flb_free(out_data); + out_data = out_data_b64; + out_data_len = out_data_b64_len; + ret = decompress(out_data, out_data_len, (void *)&out_data_b64, + &out_data_b64_len); + flb_free(out_data); + out_data = NULL; + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("| Decompression failure"); + out_data_b64 = flb_malloc(1); /* placeholder malloc */ + } + } + + ret = memcmp(tcase->expect_out_data_b64, out_data_b64, out_data_b64_len); + TEST_CHECK(ret == 0); + TEST_MSG("| Expected output(%s): %s", + (test_type == FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS) + ? "b64" : "decompressed", tcase->expect_out_data_b64); + TEST_MSG("| Produced output(%s): %s", + (test_type == FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS) + ? "b64" : "decompressed", out_data_b64); + + len = strlen(tcase->expect_out_data_b64); + TEST_CHECK(len == out_data_b64_len); + TEST_MSG("| Expected length: %zu", len); + TEST_MSG("| Produced length: %zu", out_data_b64_len); + + TEST_MSG("*- For input data: %s", tcase->in_data); + + flb_free(out_data_b64); + ++tcase; + } +} + +/* B64 check script copied from Monkey Auth Plugin */ +/* Change log: + * Removed auto new line entry from every 72 characters to make consistant with + * the actual base64 conversion + */ +/* Copied from monkey/plugins/auth/base64.c */ + +#include +#if defined(MALLOC_JEMALLOC) +#define __mem_alloc mk_api->mem_alloc +#define __mem_free mk_api->mem_free +#else +#define __mem_alloc malloc +#define __mem_free free +#endif + +static const unsigned char base64_table[65] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +/** + * base64_encode - Base64 encode + * @src: Data to be encoded + * @len: Length of the data to be encoded + * @out_len: Pointer to output length variable, or %NULL if not used + * Returns: Allocated buffer of out_len bytes of encoded data, + * or %NULL on failure + * + * Caller is responsible for freeing the returned buffer. Returned buffer is + * nul terminated to make it easier to use as a C string. The nul terminator is + * not included in out_len. + */ +static unsigned char * base64_encode(const unsigned char *src, size_t len, + size_t *out_len) +{ + unsigned char *out, *pos; + const unsigned char *end, *in; + size_t olen; + size_t line_len; + + olen = len * 4 / 3 + 4; /* 3-byte blocks to 4-byte */ + olen += olen / 72; /* line feeds */ + olen++; /* nul termination */ + if (olen < len) + return NULL; /* integer overflow */ + if (mk_api != NULL) { + out = __mem_alloc(olen); + } + else { + out = __mem_alloc(olen); + } + + if (out == NULL) + return NULL; + + end = src + len; + in = src; + pos = out; + line_len = 0; + while (end - in >= 3) { + *pos++ = base64_table[in[0] >> 2]; + *pos++ = base64_table[((in[0] & 0x03) << 4) | (in[1] >> 4)]; + *pos++ = base64_table[((in[1] & 0x0f) << 2) | (in[2] >> 6)]; + *pos++ = base64_table[in[2] & 0x3f]; + in += 3; + line_len += 4; + } + + if (end - in) { + *pos++ = base64_table[in[0] >> 2]; + if (end - in == 1) { + *pos++ = base64_table[(in[0] & 0x03) << 4]; + *pos++ = '='; + } else { + *pos++ = base64_table[((in[0] & 0x03) << 4) | + (in[1] >> 4)]; + *pos++ = base64_table[(in[1] & 0x0f) << 2]; + } + *pos++ = '='; + line_len += 4; + } + + if (line_len) + *pos++ = '\n'; + + *pos = '\0'; + if (out_len) + *out_len = pos - out; + return out; +} + +/** + * base64_decode - Base64 decode + * @src: Data to be decoded + * @len: Length of the data to be decoded + * @out_len: Pointer to output length variable + * Returns: Allocated buffer of out_len bytes of decoded data, + * or %NULL on failure + * + * Caller is responsible for freeing the returned buffer. + */ +static unsigned char * base64_decode(const unsigned char *src, size_t len, + size_t *out_len) +{ + unsigned char dtable[256], *out, *pos, block[4], tmp; + size_t i, count, olen; + int pad = 0; + + memset(dtable, 0x80, 256); + for (i = 0; i < sizeof(base64_table) - 1; i++) + dtable[base64_table[i]] = (unsigned char) i; + dtable['='] = 0; + + count = 0; + for (i = 0; i < len; i++) { + if (dtable[src[i]] != 0x80) + count++; + } + + if (count == 0 || count % 4) + return NULL; + + olen = (count / 4 * 3) + 1; + pos = out = __mem_alloc(olen); + if (out == NULL) + return NULL; + + count = 0; + for (i = 0; i < len; i++) { + tmp = dtable[src[i]]; + if (tmp == 0x80) + continue; + + if (src[i] == '=') + pad++; + block[count] = tmp; + count++; + if (count == 4) { + *pos++ = (block[0] << 2) | (block[1] >> 4); + *pos++ = (block[1] << 4) | (block[2] >> 2); + *pos++ = (block[2] << 6) | block[3]; + count = 0; + if (pad) { + if (pad == 1) + pos--; + else if (pad == 2) + pos -= 2; + else { + /* Invalid padding */ + __mem_free(out); + return NULL; + } + break; + } + } + } + *pos = '\0'; + + *out_len = pos - out; + return out; +} + +/* End of copied base64.c from monkey */ From df5795d7acf3ee61dc5c1e7ae1d5553a5945ee29 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Mon, 6 Dec 2021 19:58:39 +0000 Subject: [PATCH 3/4] out_s3: migrate to shared compression lib Signed-off-by: Matthew Fala --- plugins/out_s3/CMakeLists.txt | 5 - plugins/out_s3/arrow/CMakeLists.txt | 7 -- plugins/out_s3/arrow/compress.c | 147 ---------------------------- plugins/out_s3/arrow/compress.h | 13 --- plugins/out_s3/s3.c | 46 +++------ plugins/out_s3/s3.h | 4 - 6 files changed, 14 insertions(+), 208 deletions(-) delete mode 100644 plugins/out_s3/arrow/CMakeLists.txt delete mode 100644 plugins/out_s3/arrow/compress.c delete mode 100644 plugins/out_s3/arrow/compress.h diff --git a/plugins/out_s3/CMakeLists.txt b/plugins/out_s3/CMakeLists.txt index 2a3412b3682..94e04861707 100644 --- a/plugins/out_s3/CMakeLists.txt +++ b/plugins/out_s3/CMakeLists.txt @@ -4,8 +4,3 @@ set(src s3_multipart.c) FLB_PLUGIN(out_s3 "${src}" "") - -if(FLB_ARROW) - add_subdirectory(arrow EXCLUDE_FROM_ALL) - target_link_libraries(flb-plugin-out_s3 out-s3-arrow) -endif() diff --git a/plugins/out_s3/arrow/CMakeLists.txt b/plugins/out_s3/arrow/CMakeLists.txt deleted file mode 100644 index 36dedc714ca..00000000000 --- a/plugins/out_s3/arrow/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -set(src - compress.c) - -add_library(out-s3-arrow STATIC ${src}) - -target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) -target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS}) diff --git a/plugins/out_s3/arrow/compress.c b/plugins/out_s3/arrow/compress.c deleted file mode 100644 index 8a09aca1248..00000000000 --- a/plugins/out_s3/arrow/compress.c +++ /dev/null @@ -1,147 +0,0 @@ -/* - * This converts S3 plugin's request buffer into Apache Arrow format. - * - * We use GLib binding to call Arrow functions (which is implemented - * in C++) from Fluent Bit. - * - * https://github.com/apache/arrow/tree/master/c_glib - */ - -#include -#include - -/* - * GArrowTable is the central structure that represents "table" (a.k.a. - * data frame). - */ -static GArrowTable* parse_json(uint8_t *json, int size) -{ - GArrowJSONReader *reader; - GArrowBuffer *buffer; - GArrowBufferInputStream *input; - GArrowJSONReadOptions *options; - GArrowTable *table; - GError *error = NULL; - - buffer = garrow_buffer_new(json, size); - if (buffer == NULL) { - return NULL; - } - - input = garrow_buffer_input_stream_new(buffer); - if (input == NULL) { - g_object_unref(buffer); - return NULL; - } - - options = garrow_json_read_options_new(); - if (options == NULL) { - g_object_unref(buffer); - g_object_unref(input); - return NULL; - } - - reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); - if (reader == NULL) { - g_error_free(error); - g_object_unref(buffer); - g_object_unref(input); - g_object_unref(options); - return NULL; - } - - table = garrow_json_reader_read(reader, &error); - if (table == NULL) { - g_error_free(error); - g_object_unref(buffer); - g_object_unref(input); - g_object_unref(options); - g_object_unref(reader); - return NULL; - } - g_object_unref(buffer); - g_object_unref(input); - g_object_unref(options); - g_object_unref(reader); - return table; -} - -static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) -{ - GArrowResizableBuffer *buffer; - GArrowBufferOutputStream *sink; - GError *error = NULL; - gboolean success; - - buffer = garrow_resizable_buffer_new(0, &error); - if (buffer == NULL) { - g_error_free(error); - return NULL; - } - - sink = garrow_buffer_output_stream_new(buffer); - if (sink == NULL) { - g_object_unref(buffer); - return NULL; - } - - success = garrow_table_write_as_feather( - table, GARROW_OUTPUT_STREAM(sink), - NULL, &error); - if (!success) { - g_error_free(error); - g_object_unref(buffer); - g_object_unref(sink); - return NULL; - } - g_object_unref(sink); - return buffer; -} - -int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size) -{ - GArrowTable *table; - GArrowResizableBuffer *buffer; - GBytes *bytes; - gconstpointer ptr; - gsize len; - uint8_t *buf; - - table = parse_json(json, size); - if (table == NULL) { - return -1; - } - - buffer = table_to_buffer(table); - g_object_unref(table); - if (buffer == NULL) { - return -1; - } - - bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); - if (bytes == NULL) { - g_object_unref(buffer); - return -1; - } - - ptr = g_bytes_get_data(bytes, &len); - if (ptr == NULL) { - g_object_unref(buffer); - g_bytes_unref(bytes); - return -1; - } - - buf = malloc(len); - if (buf == NULL) { - g_object_unref(buffer); - g_bytes_unref(bytes); - return -1; - } - memcpy(buf, ptr, len); - *out_buf = (void *) buf; - *out_size = len; - - g_object_unref(buffer); - g_bytes_unref(bytes); - return 0; -} diff --git a/plugins/out_s3/arrow/compress.h b/plugins/out_s3/arrow/compress.h deleted file mode 100644 index 867d9ce02f3..00000000000 --- a/plugins/out_s3/arrow/compress.h +++ /dev/null @@ -1,13 +0,0 @@ -/* - * This function converts out_s3 buffer into Apache Arrow format. - * - * `json` is a string that contain (concatenated) JSON objects. - * - * `size` is the length of the json data (excluding the trailing - * null-terminator character). - * - * Return 0 on success (with `out_buf` and `out_size` updated), - * and -1 on failure - */ - -int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size); diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 4eef6b7a566..3f0bd08bc72 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -37,10 +38,6 @@ #include "s3.h" #include "s3_store.h" -#ifdef FLB_HAVE_ARROW -#include "arrow/compress.h" -#endif - static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, struct s3_file *chunk, char **out_buf, size_t *out_size); @@ -129,7 +126,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression == COMPRESS_GZIP) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -156,7 +153,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression == COMPRESS_GZIP) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { s3_headers[n] = content_encoding_header; n++; } @@ -687,18 +684,12 @@ static int cb_s3_init(struct flb_output_instance *ins, "use_put_object must be enabled when compression is enabled"); return -1; } - if (strcmp(tmp, "gzip") == 0) { - ctx->compression = COMPRESS_GZIP; - } -#ifdef FLB_HAVE_ARROW - else if (strcmp(tmp, "arrow") == 0) { - ctx->compression = COMPRESS_ARROW; - } -#endif - else { + ret = flb_aws_compression_get_type(tmp); + if (ret == -1) { flb_plg_error(ctx->ins, "unknown compression: %s", tmp); return -1; } + ctx->compression = ret; } tmp = flb_output_get_property("content_type", ins); @@ -1272,8 +1263,9 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_sds_destroy(s3_key); uri = tmp; - if (ctx->compression == COMPRESS_GZIP) { - ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size); + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + ret = flb_aws_compression_compress(ctx->compression, body, body_size, + &compressed_body, &final_body_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); flb_sds_destroy(uri); @@ -1281,17 +1273,6 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time } final_body = (char *) compressed_body; } -#ifdef FLB_HAVE_ARROW - else if (ctx->compression == COMPRESS_ARROW) { - ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "Failed to compress data"); - flb_sds_destroy(uri); - return -1; - } - final_body = compressed_body; - } -#endif else { final_body = body; final_body_size = body_size; @@ -1335,7 +1316,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, uri, final_body, final_body_size, headers, num_headers); - if (ctx->compression != COMPRESS_NONE) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(compressed_body); } flb_free(headers); @@ -2247,9 +2228,10 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. 'gzip' is currently the only supported value. " - "The Content-Encoding HTTP Header will be set to 'gzip'. " - "If Apache Arrow was enabled at compile time, you can set 'arrow' to this option." + "Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. " + "'arrow' is only an available if Apache Arrow was enabled at compile time. " + "Defaults to no compression. " + "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 47a8eed1157..a05dda077c6 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -46,10 +46,6 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 -#define COMPRESS_NONE 0 -#define COMPRESS_GZIP 1 -#define COMPRESS_ARROW 2 - /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can From 1d572d04cc68fc3473680bb10ade5fd6d373c12a Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Mon, 6 Dec 2021 19:59:39 +0000 Subject: [PATCH 4/4] out_kinesis_firehose: integrate with shared compression lib Signed-off-by: Matthew Fala --- plugins/out_kinesis_firehose/firehose.c | 20 +++++++ plugins/out_kinesis_firehose/firehose.h | 1 + plugins/out_kinesis_firehose/firehose_api.c | 62 +++++++++++++++------ plugins/out_kinesis_firehose/firehose_api.h | 1 + 4 files changed, 66 insertions(+), 18 deletions(-) diff --git a/plugins/out_kinesis_firehose/firehose.c b/plugins/out_kinesis_firehose/firehose.c index b2bc01caa17..5451c00459d 100644 --- a/plugins/out_kinesis_firehose/firehose.c +++ b/plugins/out_kinesis_firehose/firehose.c @@ -35,6 +35,8 @@ #include #include +#include + #include #include #include @@ -119,6 +121,15 @@ static int cb_firehose_init(struct flb_output_instance *ins, ctx->sts_endpoint = (char *) tmp; } + tmp = flb_output_get_property("compression", ins); + if (tmp) { + ret = flb_aws_compression_get_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + goto error; + } + ctx->compression = ret; + } tmp = flb_output_get_property("log_key", ins); if (tmp) { @@ -427,6 +438,15 @@ static struct flb_config_map config_map[] = { "Custom endpoint for the STS API." }, + { + FLB_CONFIG_MAP_STR, "compression", NULL, + 0, FLB_FALSE, 0, + "Compression type for Firehose records. Each log record is individually compressed " + "and sent to Firehose. 'gzip' and 'arrow' are the supported values. " + "'arrow' is only an available if Apache Arrow was enabled at compile time. " + "Defaults to no compression." + }, + { FLB_CONFIG_MAP_STR, "log_key", NULL, 0, FLB_TRUE, offsetof(struct flb_firehose, log_key), diff --git a/plugins/out_kinesis_firehose/firehose.h b/plugins/out_kinesis_firehose/firehose.h index 7f84e0002dc..85dd0f72f6d 100644 --- a/plugins/out_kinesis_firehose/firehose.h +++ b/plugins/out_kinesis_firehose/firehose.h @@ -89,6 +89,7 @@ struct flb_firehose { char *sts_endpoint; int custom_endpoint; int retry_requests; + int compression; /* must be freed on shutdown if custom_endpoint is not set */ char *endpoint; diff --git a/plugins/out_kinesis_firehose/firehose_api.c b/plugins/out_kinesis_firehose/firehose_api.c index 2e5cd751e43..4b1568536e0 100644 --- a/plugins/out_kinesis_firehose/firehose_api.c +++ b/plugins/out_kinesis_firehose/firehose_api.c @@ -36,6 +36,8 @@ #include #include +#include + #include #include #include @@ -160,6 +162,7 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf, struct tm *tmp; size_t len; size_t tmp_size; + void *compressed_tmp_buf; tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; ret = flb_msgpack_to_json(tmp_buf_ptr, @@ -260,29 +263,52 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf, memcpy(tmp_buf_ptr + written, "\n", 1); written++; - /* - * check if event_buf is initialized and big enough - * Base64 encoding will increase size by ~4/3 - */ - size = (written * 1.5) + 4; - if (buf->event_buf == NULL || buf->event_buf_size < size) { - flb_free(buf->event_buf); - buf->event_buf = flb_malloc(size); - buf->event_buf_size = size; - if (buf->event_buf == NULL) { + if (ctx->compression == FLB_AWS_COMPRESS_NONE) { + /* + * check if event_buf is initialized and big enough + * Base64 encoding will increase size by ~4/3 + */ + size = (written * 1.5) + 4; + if (buf->event_buf == NULL || buf->event_buf_size < size) { + flb_free(buf->event_buf); + buf->event_buf = flb_malloc(size); + buf->event_buf_size = size; + if (buf->event_buf == NULL) { + flb_errno(); + return -1; + } + } + + tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; + + ret = mbedtls_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, + (unsigned char *) tmp_buf_ptr, written); + if (ret != 0) { flb_errno(); return -1; } + written = b64_len; } - - tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; - ret = mbedtls_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, - (unsigned char *) tmp_buf_ptr, written); - if (ret != 0) { - flb_errno(); - return -1; + else { + /* + * compress event, truncating input if needed + * replace event buffer with compressed buffer + */ + ret = flb_aws_compression_b64_truncate_compress(ctx->compression, + MAX_B64_EVENT_SIZE, + tmp_buf_ptr, + written, &compressed_tmp_buf, + &size); /* evaluate size */ + if (ret == -1) { + flb_plg_error(ctx->ins, "Unable to compress record, discarding, " + "%s", written + 1, ctx->delivery_stream); + return 2; + } + flb_free(buf->event_buf); + buf->event_buf = compressed_tmp_buf; + compressed_tmp_buf = NULL; + written = size; } - written = b64_len; tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; if ((buf->tmp_buf_size - buf->tmp_buf_offset) < written) { diff --git a/plugins/out_kinesis_firehose/firehose_api.h b/plugins/out_kinesis_firehose/firehose_api.h index 69e92d392c5..386aa80497c 100644 --- a/plugins/out_kinesis_firehose/firehose_api.h +++ b/plugins/out_kinesis_firehose/firehose_api.h @@ -24,6 +24,7 @@ #define PUT_RECORD_BATCH_PAYLOAD_SIZE 4194304 #define MAX_EVENTS_PER_PUT 500 #define MAX_EVENT_SIZE 1024000 +#define MAX_B64_EVENT_SIZE 1365336 /* ceil(1024000 / 3) * 4 */ /* number of characters needed to 'start' a PutRecordBatch payload */ #define PUT_RECORD_BATCH_HEADER_LEN 42