From e93fe4dcfed6ed7bca9390610fa1610d0dfc4de1 Mon Sep 17 00:00:00 2001 From: Stephen Lee Date: Mon, 26 Jul 2021 18:08:04 +0000 Subject: [PATCH] out_s3: added sequential index feature Specify $INDEX in s3_key_format to add an index that increments every time it uploads a file. Sequential indexing is compatible with all other s3_key_format features, so $UUID and $TAG can be specified at the same time as $INDEX. When $INDEX is specified but $UUID is not, a random string will not be appended to the end of the key name. If FluentBit crashes or fails to upload, the index will be preserved in the filesystem in a metadata file located in ${store_dir}/index_metadata and reused on startup. This configuration option uses native C functions so is incompatible with multi-threading. Tested through unit testing and various input plugins (exec, random, etc). Signed-off-by: Stephen Lee --- plugins/out_s3/s3.c | 227 +++++++++++++++++++++++++++++++++++++++++--- plugins/out_s3/s3.h | 6 ++ 2 files changed, 219 insertions(+), 14 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 65a50781103..eb538ee80d2 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -254,6 +255,132 @@ static flb_sds_t concat_path(char *p1, char *p2) return dir; } +/* Reads in index value from metadata file and sets seq_index to value */ +static int read_seq_index(char *seq_index_file, uint64_t *seq_index) +{ + FILE *fp; + int ret; + + fp = fopen(seq_index_file, "r"); + if (fp == NULL) { + flb_errno(); + return -1; + } + + ret = fscanf(fp, "%"PRIu64, seq_index); + if (ret != 1) { + flb_errno(); + return -1; + } + + fclose(fp); + return 0; +} + +/* Writes index value to metadata file */ +static int write_seq_index(char *seq_index_file, uint64_t seq_index) +{ + FILE *fp; + int ret; + + fp = fopen(seq_index_file, "w+"); + if (fp == NULL) { + flb_errno(); + return -1; + } + + ret = fprintf(fp, "%"PRIu64, seq_index); + if (ret < 0) { + flb_errno(); + return -1; + } + + fclose(fp); + return 0; +} + +static int init_seq_index(void *context) { + int ret; + const char *tmp; + char tmp_buf[1024]; + struct flb_s3 *ctx = context; + + ctx->key_fmt_has_seq_index = FLB_TRUE; + + ctx->stream_metadata = flb_fstore_stream_create(ctx->fs, "sequence"); + if (!ctx->stream_metadata) { + flb_plg_error(ctx->ins, "could not initialize metadata stream"); + flb_fstore_destroy(ctx->fs); + ctx->fs = NULL; + return -1; + } + + /* Construct directories and file path names */ + ctx->metadata_dir = flb_sds_create(ctx->stream_metadata->path); + if (ctx->metadata_dir == NULL) { + flb_plg_error(ctx->ins, "Failed to create metadata path"); + flb_errno(); + return -1; + } + tmp = "/index_metadata"; + ret = flb_sds_cat_safe(&ctx->metadata_dir, tmp, strlen(tmp)); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to create metadata path"); + flb_errno(); + return -1; + } + + ctx->seq_index_file = flb_sds_create(ctx->metadata_dir); + if (ctx->seq_index_file == NULL) { + flb_plg_error(ctx->ins, "Failed to create sequential index file path"); + flb_errno(); + return -1; + } + tmp = "/seq_index_"; + ret = flb_sds_cat_safe(&ctx->seq_index_file, tmp, strlen(tmp)); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to create sequential index file path"); + flb_errno(); + return -1; + } + + sprintf(tmp_buf, "%d", ctx->ins->id); + ret = flb_sds_cat_safe(&ctx->seq_index_file, tmp_buf, strlen(tmp_buf)); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to create sequential index file path"); + flb_errno(); + return -1; + } + + /* Create directory path if it doesn't exist */ + ret = mkdir(ctx->metadata_dir, 0600); + if (ret < 0 && errno != EEXIST) { + flb_plg_error(ctx->ins, "Failed to create metadata directory"); + return -1; + } + + /* Check if index file doesn't exist and set index value */ + if (access(ctx->seq_index_file, F_OK) != 0) { + ctx->seq_index = 0; + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to write to sequential index metadata file"); + return -1; + } + } + else { + ret = read_seq_index(ctx->seq_index_file, &ctx->seq_index); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to read from sequential index " + "metadata file"); + return -1; + } + flb_plg_info(ctx->ins, "Successfully recovered index. " + "Continuing at index=%d", ctx->seq_index); + } + return 0; +} + void multipart_upload_destroy(struct multipart_upload *m_upload) { int i; @@ -326,6 +453,14 @@ static void s3_context_destroy(struct flb_s3 *ctx) flb_sds_destroy(ctx->buffer_dir); } + if (ctx->metadata_dir) { + flb_sds_destroy(ctx->metadata_dir); + } + + if (ctx->seq_index_file) { + flb_sds_destroy(ctx->seq_index_file); + } + /* Remove uploads */ mk_list_foreach_safe(head, tmp, &ctx->uploads) { m_upload = mk_list_entry(head, struct multipart_upload, _head); @@ -431,12 +566,26 @@ static int cb_s3_init(struct flb_output_instance *ins, } ctx->buffer_dir = tmp_sds; + /* Initialize local storage */ + ret = s3_store_init(ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to initialize S3 storage: %s", + ctx->store_dir); + return -1; + } + tmp = flb_output_get_property("s3_key_format", ins); if (tmp) { if (tmp[0] != '/') { flb_plg_error(ctx->ins, "'s3_key_format' must start with a '/'"); return -1; } + if (strstr((char *) tmp, "$INDEX")) { + ret = init_seq_index(ctx); + if (ret < 0) { + return -1; + } + } if (strstr((char *) tmp, "$UUID")) { ctx->key_fmt_has_uuid = FLB_TRUE; } @@ -637,14 +786,6 @@ static int cb_s3_init(struct flb_output_instance *ins, } } - /* Initialize local storage */ - ret = s3_store_init(ctx); - if (ret == -1) { - flb_plg_error(ctx->ins, "Failed to initialize S3 storage: %s", - ctx->store_dir); - return -1; - } - /* read any remaining buffers from previous (failed) executions */ ctx->has_old_buffers = s3_store_has_data(ctx); ctx->has_old_uploads = s3_store_has_uploads(ctx); @@ -871,6 +1012,15 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, s3_store_file_unlock(chunk); chunk->failures += 1; } + if (ctx->key_fmt_has_seq_index) { + ctx->seq_index--; + + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to decrement index after request error"); + return -1; + } + } return FLB_RETRY; } m_upload->part_number += 1; @@ -933,6 +1083,10 @@ static int put_all_chunks(struct flb_s3 *ctx) if (fs_stream == ctx->stream_upload) { continue; } + /* skip metadata stream */ + if (fs_stream == ctx->stream_metadata) { + continue; + } mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); @@ -1063,14 +1217,16 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time size_t final_body_size; char final_body_md5[25]; - s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters); + s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters, + ctx->seq_index); if (!s3_key) { flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag); return -1; } len = strlen(s3_key); - if ((len + 16) <= 1024 && !ctx->key_fmt_has_uuid) { + if ((len + 16) <= 1024 && !ctx->key_fmt_has_uuid && + !ctx->key_fmt_has_seq_index) { append_random = FLB_TRUE; len += 16; } @@ -1128,6 +1284,19 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time return -1; } } + + /* Update file and increment index value right before request */ + if (ctx->key_fmt_has_seq_index) { + ctx->seq_index++; + + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0 && access(ctx->seq_index_file, F_OK) == 0) { + ctx->seq_index--; + flb_sds_destroy(s3_key); + flb_plg_error(ctx->ins, "Failed to update sequential index metadata file"); + return -1; + } + } s3_client = ctx->s3_client; if (s3_plugin_under_test() == FLB_TRUE) { @@ -1138,7 +1307,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time if (ret == -1) { flb_plg_error(ctx->ins, "Failed to create headers"); flb_sds_destroy(uri); - return -1; + goto decrement_index; } c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, uri, final_body, final_body_size, @@ -1159,6 +1328,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_plg_info(ctx->ins, "Successfully uploaded object %s", final_key); flb_sds_destroy(uri); flb_http_client_destroy(c); + return 0; } flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, @@ -1171,6 +1341,18 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_plg_error(ctx->ins, "PutObject request failed"); flb_sds_destroy(uri); + goto decrement_index; + +decrement_index: + if (ctx->key_fmt_has_seq_index) { + ctx->seq_index--; + + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to decrement index after request error"); + return -1; + } + } return -1; } @@ -1226,6 +1408,7 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag, int tag_len) { + int ret; struct multipart_upload *m_upload = NULL; flb_sds_t s3_key = NULL; flb_sds_t tmp_sds = NULL; @@ -1236,7 +1419,8 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, flb_errno(); return NULL; } - s3_key = flb_get_s3_key(ctx->s3_key_format, time(NULL), tag, ctx->tag_delimiters); + s3_key = flb_get_s3_key(ctx->s3_key_format, time(NULL), tag, ctx->tag_delimiters, + ctx->seq_index); if (!s3_key) { flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag); flb_free(m_upload); @@ -1255,6 +1439,19 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, m_upload->init_time = time(NULL); mk_list_add(&m_upload->_head, &ctx->uploads); + /* Update file and increment index value right before request */ + if (ctx->key_fmt_has_seq_index) { + ctx->seq_index++; + + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0) { + ctx->seq_index--; + flb_sds_destroy(s3_key); + flb_plg_error(ctx->ins, "Failed to write to sequential index metadata file"); + return NULL; + } + } + return m_upload; } @@ -2043,8 +2240,10 @@ static struct flb_config_map config_map[] = { "by the rewrite_tag filter. Add $TAG in the format string to insert the full " "log tag; add $TAG[0] to insert the first part of the tag in the s3 key. " "The tag is split into “parts” using the characters specified with the " - "s3_key_format_tag_delimiters option. See the in depth examples and tutorial" - " in the documentation." + "s3_key_format_tag_delimiters option. Add $INDEX to enable sequential indexing " + "for file names. Adding $INDEX will prevent random string being added to end of key" + "when $UUID is not provided. See the in depth examples and tutorial in the " + "documentation." }, { diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 7e0faac6018..342cb590eaf 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -133,6 +133,7 @@ struct flb_s3 { struct flb_fstore *fs; struct flb_fstore_stream *stream_active; /* default active stream */ struct flb_fstore_stream *stream_upload; /* multipart upload stream */ + struct flb_fstore_stream *stream_metadata; /* s3 metadata stream */ /* * used to track that unset buffers were found on startup that have not @@ -157,6 +158,11 @@ struct flb_s3 { int timer_ms; int key_fmt_has_uuid; + uint64_t seq_index; + int key_fmt_has_seq_index; + flb_sds_t metadata_dir; + flb_sds_t seq_index_file; + struct flb_output_instance *ins; };