From fd2d376ef158899177ca98f0928bc27896609cc0 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 May 2024 16:41:53 +0900 Subject: [PATCH] out_s3: Initial support for parquet format with columnify Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 467 ++++++++++++++++++++++++++++++- plugins/out_s3/s3.h | 15 + plugins/out_s3/s3_win32_compat.h | 64 +++++ 3 files changed, 542 insertions(+), 4 deletions(-) create mode 100644 plugins/out_s3/s3_win32_compat.h diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index d6c58d6bbba..c1635c0d7ef 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -33,12 +33,15 @@ #include #include #include +#include #include +#include #include #include "s3.h" #include "s3_store.h" +#include "s3_win32_compat.h" #define DEFAULT_S3_PORT 443 #define DEFAULT_S3_INSECURE_PORT 80 @@ -488,6 +491,22 @@ static void s3_context_destroy(struct flb_s3 *ctx) flb_sds_destroy(ctx->seq_index_file); } + if (ctx->parquet_compression) { + flb_sds_destroy(ctx->parquet_compression); + } + + if (ctx->parquet_record_type) { + flb_sds_destroy(ctx->parquet_record_type); + } + + if (ctx->parquet_schema_type) { + flb_sds_destroy(ctx->parquet_schema_type); + } + + if (ctx->parquet_schema_file) { + flb_sds_destroy(ctx->parquet_schema_file); + } + /* Remove uploads */ mk_list_foreach_safe(head, tmp, &ctx->uploads) { m_upload = mk_list_entry(head, struct multipart_upload, _head); @@ -509,6 +528,7 @@ static int cb_s3_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { int ret; + int i; flb_sds_t tmp_sds; char *role_arn = NULL; char *session_name; @@ -521,6 +541,8 @@ static int cb_s3_init(struct flb_output_instance *ins, struct flb_split_entry *tok; struct mk_list *split; int list_size; + FILE *cmdp = NULL; + char buf[32]; ctx = flb_calloc(1, sizeof(struct flb_s3)); if (!ctx) { @@ -652,13 +674,118 @@ static int cb_s3_init(struct flb_output_instance *ins, "use_put_object must be enabled when Apache Arrow is enabled"); return -1; } + if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + flb_plg_error(ctx->ins, + "use_put_object must be enabled when parquet is enabled"); + return -1; + } ctx->compression = ret; } + /* Parquet */ + ctx->parquet_compression = NULL; + ctx->parquet_record_type = NULL; + ctx->parquet_schema_type = NULL; + ctx->parquet_schema_file = NULL; + + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + cmdp = flb_popen(DEFAULT_PARQUET_COMMAND_EXISTENCE, "r"); + if (cmdp == NULL) { + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); + return -1; + } + flb_pclose(cmdp); + + tmp = flb_output_get_property("parquet.compression", ins); + if (!tmp) { + ctx->parquet_compression = \ + flb_sds_create_len(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES, + strlen(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES)); + flb_plg_debug(ctx->ins, "parquet.compression format is %s", + DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); + } + else { + if (strncasecmp(tmp, "uncompressed", 12) == 0 || + strncasecmp(tmp, "snappy", 6) == 0 || + strncasecmp(tmp, "gzip", 4) == 0 || + strncasecmp(tmp, "zstd", 4) == 0) { + flb_plg_info(ctx->ins, "parquet.compression format is %s", tmp); + } + else if (strncasecmp(tmp, "lzo", 3) == 0 || + strncasecmp(tmp, "brotli", 6) == 0 || + strncasecmp(tmp, "lz4", 3) == 0) { + flb_plg_info(ctx->ins, "unsupported parquet.compression format %s", tmp); + } + else { + flb_plg_error(ctx->ins, "unknown parquet.compression format %s", tmp); + return -1; + } + for (i = 0; i < strlen(tmp) || i < sizeof(buf); i++) { + buf[i] = toupper(tmp[i]); + } + + ctx->parquet_compression = flb_sds_create_len(buf, strlen(buf)); + } + + tmp = flb_output_get_property("parquet.record_type", ins); + if (!tmp) { + flb_plg_info(ctx->ins, "parquet.record_type format is %s", + DEFAULT_PARQUET_RECORD_TYPE); + ctx->parquet_record_type = \ + flb_sds_create_len(DEFAULT_PARQUET_RECORD_TYPE, + strlen(DEFAULT_PARQUET_RECORD_TYPE)); + } + else { + if (strncasecmp(tmp, "json", 4) == 0) { + tmp = "jsonl"; /* json should be interpreted as jsonl */ + flb_plg_info(ctx->ins, "parquet.record_type format is %s", tmp); + } + else if (strncasecmp(tmp, "msgpack", 7) == 0 || + strncasecmp(tmp, "jsonl", 5) == 0) { + flb_plg_info(ctx->ins, "parquet.record_type format is %s", tmp); + } + else { + flb_plg_error(ctx->ins, "unknown parquet.record_type format %s", tmp); + return -1; + } + ctx->parquet_record_type = flb_sds_create_len(tmp, strlen(tmp)); + } + + tmp = flb_output_get_property("parquet.schema_type", ins); + if (!tmp) { + flb_plg_info(ctx->ins, "parquet.schema_type format is %s", + DEFAULT_PARQUET_SCHEMA_TYPE); + ctx->parquet_schema_type = \ + flb_sds_create_len(DEFAULT_PARQUET_SCHEMA_TYPE, + strlen(DEFAULT_PARQUET_SCHEMA_TYPE)); + } + else { + if (strncasecmp(tmp, "msgpack", 7) == 0 || + strncasecmp(tmp, "json", 4) == 0) { + flb_plg_info(ctx->ins, "parquet.record_type format is %s", tmp); + } + else { + flb_plg_error(ctx->ins, "unknown parquet.record_type format %s", tmp); + return -1; + } + ctx->parquet_schema_type = flb_sds_create_len(tmp, strlen(tmp)); + } + + tmp = flb_output_get_property("parquet.schema_file", ins); + if (!tmp) { + flb_plg_error(ctx->ins, "parquet.schema_file is missing"); + return -1; + } + ctx->parquet_schema_file = flb_sds_create_len(tmp, strlen(tmp)); + } + tmp = flb_output_get_property("content_type", ins); if (tmp) { ctx->content_type = (char *) tmp; } + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + ctx->content_type = (char *) "application/octet-stream"; + } if (ctx->use_put_object == FLB_FALSE) { /* upload_chunk_size */ if (ctx->upload_chunk_size <= 0) { @@ -967,6 +1094,266 @@ static int cb_s3_init(struct flb_output_instance *ins, return 0; } +static int s3_compress_parquet(struct flb_s3 *ctx, + char *body, size_t body_size, + void **payload_buf, size_t *payload_size) +{ + int ret = 0; + int result = 0; + char *template_in = "out_s3-body-XXXXXX"; + char *template_out = "out_s3-parquet-XXXXXX"; + char infile[32]; + char outfile[32]; + FILE *write_ptr = NULL; + FILE *read_ptr = NULL; + flb_sds_t parquet_cmd = NULL; + size_t bytes; + flb_sds_t tmp; + flb_sds_t amount_page = NULL; + flb_sds_t amount_row_group = NULL; + FILE *cmdp = NULL; + size_t parquet_size = 0; + struct stat stbuf; + int fdout = -1; + flb_sds_t parquet_buf; + + parquet_cmd = flb_sds_create_size(384); + if (parquet_cmd == NULL) { + goto error; + } + + amount_page = flb_sds_create_size(16); + if (amount_page == NULL) { + goto error; + } + + amount_row_group = flb_sds_create_size(16); + if (amount_row_group == NULL) { + goto error; + } + + strncpy(infile, template_in, 32); + if (!mkstemp(infile)) { + ret = -2; + goto error; + } + + strncpy(outfile, template_out, 32); + if (!mkstemp(outfile)) { + ret = -2; + goto error; + } + + write_ptr = fopen(infile, "wb"); + if (write_ptr == NULL) { + ret = -3; + goto error; + } + + read_ptr = fopen(outfile, "rb"); + if (read_ptr == NULL) { + ret = -3; + goto error; + } + + fdout = open(outfile, O_RDONLY); + if (fdout == -1) { + ret = -3; + goto error; + } + + bytes = fwrite(body, body_size, 1, write_ptr); + if (bytes == -1) { + ret = -4; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + DEFAULT_PARQUET_COMMAND, strlen(DEFAULT_PARQUET_COMMAND)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -parquetCompressionCodec ", 26); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + ctx->parquet_compression, + flb_sds_len(ctx->parquet_compression)); + if (result < 0) { + ret = -1; + goto error; + } + + + result = flb_sds_cat_safe(&parquet_cmd, + " -parquetPageSize ", 18); + if (result < 0) { + ret = -1; + goto error; + } + + tmp = flb_sds_printf(&amount_page, "%zu", ctx->parquet_page_size); + if (!tmp) { + flb_errno(); + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + amount_page, strlen(amount_page)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -parquetRowGroupSize ", 22); + if (result < 0) { + ret = -1; + goto error; + } + + tmp = flb_sds_printf(&amount_row_group, "%zu", ctx->parquet_row_group_size); + if (!tmp) { + flb_errno(); + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + amount_row_group, strlen(amount_row_group)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -recordType ", 13); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + ctx->parquet_record_type, + flb_sds_len(ctx->parquet_record_type)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -schemaType ", 13); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + ctx->parquet_schema_type, + flb_sds_len(ctx->parquet_schema_type)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -schemaFile ", 13); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + ctx->parquet_schema_file, + flb_sds_len(ctx->parquet_schema_file)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -output ", 9); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + outfile, strlen(outfile)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, " ", 1); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, infile, strlen(infile)); + if (result < 0) { + ret = -1; + goto error; + } + + cmdp = flb_popen(parquet_cmd, "r"); + if (cmdp == NULL) { + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); + return -1; + } + + fclose(write_ptr); + flb_pclose(cmdp); + + if (fstat(fdout, &stbuf) == -1) { + ret = -4; + goto error; + } + parquet_size = stbuf.st_size; + parquet_buf = flb_sds_create_size(parquet_size); + fread(parquet_buf, parquet_size, 1, read_ptr); + fclose(read_ptr); + + *payload_buf = parquet_buf; + *payload_size = parquet_size; + + flb_sds_destroy(parquet_cmd); + flb_sds_destroy(amount_page); + flb_sds_destroy(amount_row_group); + + return 0; + +error: + if (write_ptr != NULL) { + fclose(write_ptr); + } + if (read_ptr != NULL) { + fclose(read_ptr); + } + if (fdout == -1) { + close(fdout); + } + if (parquet_cmd != NULL) { + flb_sds_destroy(parquet_cmd); + } + if (amount_page != NULL) { + flb_sds_destroy(amount_page); + } + if (amount_row_group != NULL) { + flb_sds_destroy(amount_row_group); + } + + return ret; +} + /* * return value is one of FLB_OK, FLB_RETRY, FLB_ERROR * @@ -998,7 +1385,19 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, file_first_log_time = chunk->first_log_time; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + ret = s3_compress_parquet(ctx, body, body_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data with %s", DEFAULT_PARQUET_COMMAND); + return FLB_RETRY; + } + else { + preCompress_size = body_size; + body = (void *) payload_buf; + body_size = payload_size; + } + } + else if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { /* Map payload */ ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); if (ret == -1) { @@ -1066,6 +1465,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } + else if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + flb_sds_destroy(payload_buf); + } if (ret < 0) { /* re-add chunk to list */ if (chunk) { @@ -1220,7 +1622,19 @@ static int put_all_chunks(struct flb_s3 *ctx) return -1; } - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + ret = s3_compress_parquet(ctx, buffer, buffer_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data with %s", DEFAULT_PARQUET_COMMAND); + return FLB_RETRY; + } + else { + flb_plg_info(ctx->ins, "Pre-compression chunk size is %zu, After compression, chunk is %zu bytes", buffer_size, payload_size); + buffer = (void *) payload_buf; + buffer_size = payload_size; + } + } + else if (ctx->compression != FLB_AWS_COMPRESS_NONE) { /* Map payload */ ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size); if (ret == -1) { @@ -1235,7 +1649,12 @@ static int put_all_chunks(struct flb_s3 *ctx) ret = s3_put_object(ctx, (const char *) fsf->meta_buf, chunk->create_time, buffer, buffer_size); - flb_free(buffer); + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + flb_sds_destroy(buffer); + } + else { + flb_free(buffer); + } if (ret < 0) { s3_store_file_unlock(chunk); chunk->failures += 1; @@ -2365,10 +2784,50 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. " + "Compression type for S3 objects. 'gzip', 'arrow', and 'parquet' " + "are the supported values. " "'arrow' is only an available if Apache Arrow was enabled at compile time. " + "'parquet' is only an available if columify command is installed on a system. " "Defaults to no compression. " "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." + "If 'parquet' is selected, the Content-Encoding HTTP Header will be set to 'octet-stream'." + }, + { + FLB_CONFIG_MAP_STR, "parquet.compression", "snappy", + 0, FLB_TRUE, offsetof(struct flb_s3, parquet_compression), + "Compression type for Parquet format. 'uncompressed', 'snappy', 'gzip', " + "'zstd' are the supported values. " + "'lzo', 'brotli', 'lz4' are not supported for now. " + "Defaults to snappy. " + }, + { + FLB_CONFIG_MAP_SIZE, "parquet.page_size", "8192", + 0, FLB_TRUE, offsetof(struct flb_s3, parquet_page_size), + "Page size of parquet" + "Defaults to 8192. " + }, + { + FLB_CONFIG_MAP_SIZE, "parquet.row_group_size", "134217728", /* 128 * 1024 * 1024 */ + 0, FLB_TRUE, offsetof(struct flb_s3, parquet_row_group_size), + "File row group size of parquet" + "Defaults to 134217728 (= 128 * 1024 * 1024). " + }, + { + FLB_CONFIG_MAP_STR, "parquet.record_type", "json", + 0, FLB_FALSE, 0, + "Record type for parquet objects. 'json' and 'msgpack' are the supported values. " + "Defaults to msgpack. " + }, + { + FLB_CONFIG_MAP_STR, "parquet.schema_type", "avro", + 0, FLB_FALSE, 0, + "Record type for parquet objects. 'avro' and 'bigquery' are the supported values. " + "Defaults to avro. " + }, + { + FLB_CONFIG_MAP_STR, "parquet.schema_file", NULL, + 0, FLB_FALSE, 0, + "Schema file for parquet objects. " }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index e51d39f2419..4550be60973 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -47,6 +47,13 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 +#define DEFAULT_PARQUET_COMPRESSION_FORMAT "snappy" +#define DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES "SNAPPY" +#define DEFAULT_PARQUET_RECORD_TYPE "jsonl" +#define DEFAULT_PARQUET_SCHEMA_TYPE "avro" +#define DEFAULT_PARQUET_COMMAND "columnify" +#define DEFAULT_PARQUET_COMMAND_EXISTENCE "columnify -h" + /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can @@ -147,6 +154,14 @@ struct flb_s3 { struct flb_fstore_stream *stream_upload; /* multipart upload stream */ struct flb_fstore_stream *stream_metadata; /* s3 metadata stream */ + /* Parquet */ + flb_sds_t parquet_compression; + size_t parquet_page_size; + size_t parquet_row_group_size; + flb_sds_t parquet_record_type; + flb_sds_t parquet_schema_type; + flb_sds_t parquet_schema_file; + /* * used to track that unset buffers were found on startup that have not * been sent diff --git a/plugins/out_s3/s3_win32_compat.h b/plugins/out_s3/s3_win32_compat.h new file mode 100644 index 00000000000..4fa7ca5ee53 --- /dev/null +++ b/plugins/out_s3/s3_win32_compat.h @@ -0,0 +1,64 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FLB_S3_WIN32_COMPAT_H +#define FLB_S3_WIN32_COMPAT_H + +/* + * Because Windows has to do everything differently, call _popen() and + * _pclose() instead of the POSIX popen() and pclose() functions. + * + * flb_pclose() has different return value semantics on Windows vs non-windows + * targets because it propagates the pclose() or _pclose() return value + * directly. You MUST use the FLB_WIFEXITED(), FLB_WEXITSTATUS(), + * FLB_WIFSIGNALED() and FLB_WTERMSIG() macros to consume the return value, + * rather than the underlying POSIX macros or manual bit-shifts. + */ +#if !defined(FLB_SYSTEM_WINDOWS) +static inline FILE* flb_popen(const char *command, const char *type) { + return popen(command, type); +} +static inline int flb_pclose(FILE *stream) { + return pclose(stream); +} +#define FLB_PCLOSE pclose +#else +static inline FILE* flb_popen(const char *command, const char *type) { + return _popen(command, type); +} +/* + * flb_pclose() has the same return value on Windows as win32 _pclose(), rather + * than posix pclose(). The process exit code is not bit-shifted to the high + * byte. + * + * The MSVC docs for _pclose() at + * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/pclose?view=msvc-170 + * are misleading; they say that "The format of the return value is the same as + * for _cwait, except the low-order and high-order bytes are swapped." But + * _cwait isn't documented as having any meaningful return on success, the + * process exit code is meant to be in its "termstat" out parameter per + * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/cwait?view=msvc-170 + * The return code of _pclose() actually appears to be the process exit code + * without the bit-shift that waitpid() applies. + */ +static inline int flb_pclose(FILE *stream) { + return _pclose(stream); +} +#endif + +#endif /* FLB_S3_WIN32_COMPAT_H */