Skip to content

Commit

Permalink
out_s3: added sequential index feature
Browse files Browse the repository at this point in the history
Specify $INDEX in s3_key_format to add an index that increments every time it uploads a
file. Sequential indexing is compatible with all other s3_key_format features, so $UUID
and $TAG can be specified at the same time as $INDEX. When $INDEX is specified but $UUID
is not, a random string will not be appended to the end of the key name.

If FluentBit crashes or fails to upload, the index will be preserved in the filesystem
in a metadata file located in ${store_dir}/index_metadata and reused on startup. This
configuration option uses native C functions so is incompatible with multi-threading.

Tested through unit testing and various input plugins (exec, random, etc).

Signed-off-by: Stephen Lee <[email protected]>
  • Loading branch information
Stephen Lee authored and PettitWesley committed Jul 26, 2021
1 parent b0c4efd commit e93fe4d
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 14 deletions.
227 changes: 213 additions & 14 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_gzip.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <mbedtls/base64.h>
#include <mbedtls/md5.h>
#include <msgpack.h>
Expand Down Expand Up @@ -254,6 +255,132 @@ static flb_sds_t concat_path(char *p1, char *p2)
return dir;
}

/* Reads in index value from metadata file and sets seq_index to value */
static int read_seq_index(char *seq_index_file, uint64_t *seq_index)
{
FILE *fp;
int ret;

fp = fopen(seq_index_file, "r");
if (fp == NULL) {
flb_errno();
return -1;
}

ret = fscanf(fp, "%"PRIu64, seq_index);
if (ret != 1) {
flb_errno();
return -1;
}

fclose(fp);
return 0;
}

/* Writes index value to metadata file */
static int write_seq_index(char *seq_index_file, uint64_t seq_index)
{
FILE *fp;
int ret;

fp = fopen(seq_index_file, "w+");
if (fp == NULL) {
flb_errno();
return -1;
}

ret = fprintf(fp, "%"PRIu64, seq_index);
if (ret < 0) {
flb_errno();
return -1;
}

fclose(fp);
return 0;
}

static int init_seq_index(void *context) {
int ret;
const char *tmp;
char tmp_buf[1024];
struct flb_s3 *ctx = context;

ctx->key_fmt_has_seq_index = FLB_TRUE;

ctx->stream_metadata = flb_fstore_stream_create(ctx->fs, "sequence");
if (!ctx->stream_metadata) {
flb_plg_error(ctx->ins, "could not initialize metadata stream");
flb_fstore_destroy(ctx->fs);
ctx->fs = NULL;
return -1;
}

/* Construct directories and file path names */
ctx->metadata_dir = flb_sds_create(ctx->stream_metadata->path);
if (ctx->metadata_dir == NULL) {
flb_plg_error(ctx->ins, "Failed to create metadata path");
flb_errno();
return -1;
}
tmp = "/index_metadata";
ret = flb_sds_cat_safe(&ctx->metadata_dir, tmp, strlen(tmp));
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to create metadata path");
flb_errno();
return -1;
}

ctx->seq_index_file = flb_sds_create(ctx->metadata_dir);
if (ctx->seq_index_file == NULL) {
flb_plg_error(ctx->ins, "Failed to create sequential index file path");
flb_errno();
return -1;
}
tmp = "/seq_index_";
ret = flb_sds_cat_safe(&ctx->seq_index_file, tmp, strlen(tmp));
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to create sequential index file path");
flb_errno();
return -1;
}

sprintf(tmp_buf, "%d", ctx->ins->id);
ret = flb_sds_cat_safe(&ctx->seq_index_file, tmp_buf, strlen(tmp_buf));
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to create sequential index file path");
flb_errno();
return -1;
}

/* Create directory path if it doesn't exist */
ret = mkdir(ctx->metadata_dir, 0600);
if (ret < 0 && errno != EEXIST) {
flb_plg_error(ctx->ins, "Failed to create metadata directory");
return -1;
}

/* Check if index file doesn't exist and set index value */
if (access(ctx->seq_index_file, F_OK) != 0) {
ctx->seq_index = 0;
ret = write_seq_index(ctx->seq_index_file, ctx->seq_index);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to write to sequential index metadata file");
return -1;
}
}
else {
ret = read_seq_index(ctx->seq_index_file, &ctx->seq_index);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to read from sequential index "
"metadata file");
return -1;
}
flb_plg_info(ctx->ins, "Successfully recovered index. "
"Continuing at index=%d", ctx->seq_index);
}
return 0;
}

void multipart_upload_destroy(struct multipart_upload *m_upload)
{
int i;
Expand Down Expand Up @@ -326,6 +453,14 @@ static void s3_context_destroy(struct flb_s3 *ctx)
flb_sds_destroy(ctx->buffer_dir);
}

if (ctx->metadata_dir) {
flb_sds_destroy(ctx->metadata_dir);
}

if (ctx->seq_index_file) {
flb_sds_destroy(ctx->seq_index_file);
}

/* Remove uploads */
mk_list_foreach_safe(head, tmp, &ctx->uploads) {
m_upload = mk_list_entry(head, struct multipart_upload, _head);
Expand Down Expand Up @@ -431,12 +566,26 @@ static int cb_s3_init(struct flb_output_instance *ins,
}
ctx->buffer_dir = tmp_sds;

/* Initialize local storage */
ret = s3_store_init(ctx);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to initialize S3 storage: %s",
ctx->store_dir);
return -1;
}

tmp = flb_output_get_property("s3_key_format", ins);
if (tmp) {
if (tmp[0] != '/') {
flb_plg_error(ctx->ins, "'s3_key_format' must start with a '/'");
return -1;
}
if (strstr((char *) tmp, "$INDEX")) {
ret = init_seq_index(ctx);
if (ret < 0) {
return -1;
}
}
if (strstr((char *) tmp, "$UUID")) {
ctx->key_fmt_has_uuid = FLB_TRUE;
}
Expand Down Expand Up @@ -637,14 +786,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
}
}

/* Initialize local storage */
ret = s3_store_init(ctx);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to initialize S3 storage: %s",
ctx->store_dir);
return -1;
}

/* read any remaining buffers from previous (failed) executions */
ctx->has_old_buffers = s3_store_has_data(ctx);
ctx->has_old_uploads = s3_store_has_uploads(ctx);
Expand Down Expand Up @@ -871,6 +1012,15 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
s3_store_file_unlock(chunk);
chunk->failures += 1;
}
if (ctx->key_fmt_has_seq_index) {
ctx->seq_index--;

ret = write_seq_index(ctx->seq_index_file, ctx->seq_index);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to decrement index after request error");
return -1;
}
}
return FLB_RETRY;
}
m_upload->part_number += 1;
Expand Down Expand Up @@ -933,6 +1083,10 @@ static int put_all_chunks(struct flb_s3 *ctx)
if (fs_stream == ctx->stream_upload) {
continue;
}
/* skip metadata stream */
if (fs_stream == ctx->stream_metadata) {
continue;
}

mk_list_foreach_safe(f_head, tmp, &fs_stream->files) {
fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
Expand Down Expand Up @@ -1063,14 +1217,16 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
size_t final_body_size;
char final_body_md5[25];

s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters);
s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters,
ctx->seq_index);
if (!s3_key) {
flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag);
return -1;
}

len = strlen(s3_key);
if ((len + 16) <= 1024 && !ctx->key_fmt_has_uuid) {
if ((len + 16) <= 1024 && !ctx->key_fmt_has_uuid &&
!ctx->key_fmt_has_seq_index) {
append_random = FLB_TRUE;
len += 16;
}
Expand Down Expand Up @@ -1128,6 +1284,19 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
return -1;
}
}

/* Update file and increment index value right before request */
if (ctx->key_fmt_has_seq_index) {
ctx->seq_index++;

ret = write_seq_index(ctx->seq_index_file, ctx->seq_index);
if (ret < 0 && access(ctx->seq_index_file, F_OK) == 0) {
ctx->seq_index--;
flb_sds_destroy(s3_key);
flb_plg_error(ctx->ins, "Failed to update sequential index metadata file");
return -1;
}
}

s3_client = ctx->s3_client;
if (s3_plugin_under_test() == FLB_TRUE) {
Expand All @@ -1138,7 +1307,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to create headers");
flb_sds_destroy(uri);
return -1;
goto decrement_index;
}
c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
uri, final_body, final_body_size,
Expand All @@ -1159,6 +1328,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_plg_info(ctx->ins, "Successfully uploaded object %s", final_key);
flb_sds_destroy(uri);
flb_http_client_destroy(c);

return 0;
}
flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
Expand All @@ -1171,6 +1341,18 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time

flb_plg_error(ctx->ins, "PutObject request failed");
flb_sds_destroy(uri);
goto decrement_index;

decrement_index:
if (ctx->key_fmt_has_seq_index) {
ctx->seq_index--;

ret = write_seq_index(ctx->seq_index_file, ctx->seq_index);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to decrement index after request error");
return -1;
}
}
return -1;
}

Expand Down Expand Up @@ -1226,6 +1408,7 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,
static struct multipart_upload *create_upload(struct flb_s3 *ctx,
const char *tag, int tag_len)
{
int ret;
struct multipart_upload *m_upload = NULL;
flb_sds_t s3_key = NULL;
flb_sds_t tmp_sds = NULL;
Expand All @@ -1236,7 +1419,8 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx,
flb_errno();
return NULL;
}
s3_key = flb_get_s3_key(ctx->s3_key_format, time(NULL), tag, ctx->tag_delimiters);
s3_key = flb_get_s3_key(ctx->s3_key_format, time(NULL), tag, ctx->tag_delimiters,
ctx->seq_index);
if (!s3_key) {
flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag);
flb_free(m_upload);
Expand All @@ -1255,6 +1439,19 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx,
m_upload->init_time = time(NULL);
mk_list_add(&m_upload->_head, &ctx->uploads);

/* Update file and increment index value right before request */
if (ctx->key_fmt_has_seq_index) {
ctx->seq_index++;

ret = write_seq_index(ctx->seq_index_file, ctx->seq_index);
if (ret < 0) {
ctx->seq_index--;
flb_sds_destroy(s3_key);
flb_plg_error(ctx->ins, "Failed to write to sequential index metadata file");
return NULL;
}
}

return m_upload;
}

Expand Down Expand Up @@ -2043,8 +2240,10 @@ static struct flb_config_map config_map[] = {
"by the rewrite_tag filter. Add $TAG in the format string to insert the full "
"log tag; add $TAG[0] to insert the first part of the tag in the s3 key. "
"The tag is split into “parts” using the characters specified with the "
"s3_key_format_tag_delimiters option. See the in depth examples and tutorial"
" in the documentation."
"s3_key_format_tag_delimiters option. Add $INDEX to enable sequential indexing "
"for file names. Adding $INDEX will prevent random string being added to end of key"
"when $UUID is not provided. See the in depth examples and tutorial in the "
"documentation."
},

{
Expand Down
6 changes: 6 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ struct flb_s3 {
struct flb_fstore *fs;
struct flb_fstore_stream *stream_active; /* default active stream */
struct flb_fstore_stream *stream_upload; /* multipart upload stream */
struct flb_fstore_stream *stream_metadata; /* s3 metadata stream */

/*
* used to track that unset buffers were found on startup that have not
Expand All @@ -157,6 +158,11 @@ struct flb_s3 {
int timer_ms;
int key_fmt_has_uuid;

uint64_t seq_index;
int key_fmt_has_seq_index;
flb_sds_t metadata_dir;
flb_sds_t seq_index_file;

struct flb_output_instance *ins;
};

Expand Down

0 comments on commit e93fe4d

Please sign in to comment.