Skip to content

Commit

Permalink
out_s3: migrate to shared compression lib
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Fala <[email protected]>
  • Loading branch information
matthewfala committed Dec 10, 2021
1 parent f9895b3 commit df5795d
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 208 deletions.
5 changes: 0 additions & 5 deletions plugins/out_s3/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,3 @@ set(src
s3_multipart.c)

FLB_PLUGIN(out_s3 "${src}" "")

if(FLB_ARROW)
add_subdirectory(arrow EXCLUDE_FROM_ALL)
target_link_libraries(flb-plugin-out_s3 out-s3-arrow)
endif()
7 changes: 0 additions & 7 deletions plugins/out_s3/arrow/CMakeLists.txt

This file was deleted.

147 changes: 0 additions & 147 deletions plugins/out_s3/arrow/compress.c

This file was deleted.

13 changes: 0 additions & 13 deletions plugins/out_s3/arrow/compress.h

This file was deleted.

46 changes: 14 additions & 32 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_config_map.h>
#include <fluent-bit/flb_aws_util.h>
#include <fluent-bit/aws/flb_aws_compress.h>
#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_gzip.h>
Expand All @@ -37,10 +38,6 @@
#include "s3.h"
#include "s3_store.h"

#ifdef FLB_HAVE_ARROW
#include "arrow/compress.h"
#endif

static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
struct s3_file *chunk,
char **out_buf, size_t *out_size);
Expand Down Expand Up @@ -129,7 +126,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression == COMPRESS_GZIP) {
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
headers_len++;
}
if (ctx->canned_acl != NULL) {
Expand All @@ -156,7 +153,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression == COMPRESS_GZIP) {
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
s3_headers[n] = content_encoding_header;
n++;
}
Expand Down Expand Up @@ -687,18 +684,12 @@ static int cb_s3_init(struct flb_output_instance *ins,
"use_put_object must be enabled when compression is enabled");
return -1;
}
if (strcmp(tmp, "gzip") == 0) {
ctx->compression = COMPRESS_GZIP;
}
#ifdef FLB_HAVE_ARROW
else if (strcmp(tmp, "arrow") == 0) {
ctx->compression = COMPRESS_ARROW;
}
#endif
else {
ret = flb_aws_compression_get_type(tmp);
if (ret == -1) {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
return -1;
}
ctx->compression = ret;
}

tmp = flb_output_get_property("content_type", ins);
Expand Down Expand Up @@ -1272,26 +1263,16 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_sds_destroy(s3_key);
uri = tmp;

if (ctx->compression == COMPRESS_GZIP) {
ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size);
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
ret = flb_aws_compression_compress(ctx->compression, body, body_size,
&compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = (char *) compressed_body;
}
#ifdef FLB_HAVE_ARROW
else if (ctx->compression == COMPRESS_ARROW) {
ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = compressed_body;
}
#endif
else {
final_body = body;
final_body_size = body_size;
Expand Down Expand Up @@ -1335,7 +1316,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
uri, final_body, final_body_size,
headers, num_headers);
if (ctx->compression != COMPRESS_NONE) {
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
flb_free(compressed_body);
}
flb_free(headers);
Expand Down Expand Up @@ -2247,9 +2228,10 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for S3 objects. 'gzip' is currently the only supported value. "
"The Content-Encoding HTTP Header will be set to 'gzip'. "
"If Apache Arrow was enabled at compile time, you can set 'arrow' to this option."
"Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. "
"'arrow' is only an available if Apache Arrow was enabled at compile time. "
"Defaults to no compression. "
"If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'."
},
{
FLB_CONFIG_MAP_STR, "content_type", NULL,
Expand Down
4 changes: 0 additions & 4 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@

#define DEFAULT_UPLOAD_TIMEOUT 3600

#define COMPRESS_NONE 0
#define COMPRESS_GZIP 1
#define COMPRESS_ARROW 2

/*
* If we see repeated errors on an upload/chunk, we will discard it
* This saves us from scenarios where something goes wrong and an upload can
Expand Down

0 comments on commit df5795d

Please sign in to comment.