diff --git a/include/fluent-bit/flb_fstore.h b/include/fluent-bit/flb_fstore.h index f2db1a5d264..5d96104c432 100644 --- a/include/fluent-bit/flb_fstore.h +++ b/include/fluent-bit/flb_fstore.h @@ -40,6 +40,7 @@ struct flb_fstore_file { struct cio_chunk *chunk; /* chunk context */ struct cio_stream *stream; /* parent stream that owns this file */ struct mk_list _head; /* link to parent flb_fstore->files */ + struct mk_list _cache_head; /* link to flb_fstore->files_up cache */ }; struct flb_fstore_stream { @@ -55,6 +56,8 @@ struct flb_fstore { char *root_path; struct cio_ctx *cio; /* Chunk I/O context */ struct mk_list streams; + struct mk_list files_up; /* list of files that are up. last is oldest */ + int files_up_counter; }; struct flb_fstore *flb_fstore_create(char *path, int store_type); @@ -63,7 +66,9 @@ int flb_fstore_destroy(struct flb_fstore *fs); struct flb_fstore_stream *flb_fstore_stream_create(struct flb_fstore *fs, char *stream_name); -void flb_fstore_stream_destroy(struct flb_fstore_stream *stream, int delete); +void flb_fstore_stream_destroy(struct flb_fstore *fs, + struct flb_fstore_stream *stream, + int delete); int flb_fstore_file_meta_set(struct flb_fstore *fs, struct flb_fstore_file *fsf, @@ -80,7 +85,9 @@ int flb_fstore_file_content_copy(struct flb_fstore *fs, struct flb_fstore_file *fsf, void **out_buf, size_t *out_size); -int flb_fstore_file_append(struct flb_fstore_file *fsf, void *data, size_t size); +int flb_fstore_file_append(struct flb_fstore *fs, + struct flb_fstore_file *fsf, + void *data, size_t size); struct flb_fstore_file *flb_fstore_file_get(struct flb_fstore *fs, struct flb_fstore_stream *fs_stream, char *name, size_t size); diff --git a/lib/monkey/include/monkey/mk_core/mk_list.h b/lib/monkey/include/monkey/mk_core/mk_list.h index a83be12e71c..9e789842266 100644 --- a/lib/monkey/include/monkey/mk_core/mk_list.h +++ b/lib/monkey/include/monkey/mk_core/mk_list.h @@ -76,10 +76,10 @@ static inline void mk_list_add_after(struct mk_list *_new, } next = prev->next; - next->prev = prev; _new->next = next; _new->prev = prev; prev->next = _new; + next->prev = _new; /* how is this a bug not found? is it really? */ } static inline void __mk_list_del(struct mk_list *prev, struct mk_list *next) diff --git a/plugins/out_calyptia/calyptia.c b/plugins/out_calyptia/calyptia.c index e945d135b8e..4f46eec0333 100644 --- a/plugins/out_calyptia/calyptia.c +++ b/plugins/out_calyptia/calyptia.c @@ -480,7 +480,7 @@ static int store_session_set(struct flb_calyptia *ctx, char *buf, size_t size) } /* store content */ - ret = flb_fstore_file_append(ctx->fs_file, mp_buf, mp_size); + ret = flb_fstore_file_append(ctx->fs, ctx->fs_file, mp_buf, mp_size); if (ret == -1) { flb_plg_error(ctx->ins, "could not store session information"); flb_free(mp_buf); diff --git a/plugins/out_s3/s3_store.c b/plugins/out_s3/s3_store.c index 41278ea4807..a6f901de85b 100644 --- a/plugins/out_s3/s3_store.c +++ b/plugins/out_s3/s3_store.c @@ -179,7 +179,7 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, } /* Append data to the target file */ - ret = flb_fstore_file_append(fsf, data, bytes); + ret = flb_fstore_file_append(ctx->fs, fsf, data, bytes); if (ret != 0) { flb_plg_error(ctx->ins, "error writing data to local s3 file"); return -1; @@ -489,7 +489,7 @@ int s3_store_file_upload_put(struct flb_s3 *ctx, } /* Append data to the target file */ - ret = flb_fstore_file_append(fsf, data, flb_sds_len(data)); + ret = flb_fstore_file_append(ctx->fs, fsf, data, flb_sds_len(data)); if (ret != 0) { flb_plg_error(ctx->ins, "error writing data to local s3 file"); return -1; diff --git a/src/flb_fstore.c b/src/flb_fstore.c index 68d825a4b95..c4b05f4d089 100644 --- a/src/flb_fstore.c +++ b/src/flb_fstore.c @@ -43,6 +43,108 @@ static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line, return 0; } +/* remove file from files_up list */ +static inline void files_up_remove(struct flb_fstore *fs, + struct flb_fstore_file *fsf) +{ + if (fsf->_cache_head.next == NULL) { + return; + } + + mk_list_del(&fsf->_cache_head); + --fs->files_up_counter; + if (fs->files_up_counter < 0) { /* remove when stable? */ + flb_error("[fstore] fs->files_up_counter is negative which should never happen"); + } +} + +/* + * this function adds files to an fstore files_up list to track files that are + * mapped. the files_up list's order corresponds to how recent add is called on + * the files in the files_up list. recent files first, least recent files are last + * + * max of max_chunks_up files allowed before eviction of least recently added file + */ +static struct flb_fstore_file *files_up_add(struct flb_fstore *fs, + struct flb_fstore_file *fsf) +{ + struct flb_fstore_file *evict = NULL; + + /* check if already in files_up */ + if (fsf->_cache_head.next != NULL) { + /* send to front of list; renew */ + files_up_remove(fs, fsf); + files_up_add(fs, fsf); + return NULL; + } + + /* files_up cache full */ + if (fs->files_up_counter >= fs->cio->max_chunks_up) { + /* evict oldest entry */ + evict = mk_list_entry(fs->files_up.prev, struct flb_fstore_file, _cache_head); + files_up_remove(fs, evict); + } + + /* add to files_up list, front */ + mk_list_add_after(&fsf->_cache_head, &fs->files_up, &fs->files_up); + ++fs->files_up_counter; + return evict; +} + +/* lets down file if needed, fsf must be brought up soon after by caller */ +static int file_up_prep(struct flb_fstore *fs, /* previously files_up_make_room */ + struct flb_fstore_file *fsf) +{ + struct flb_fstore_file *evict; + int ret; + + evict = files_up_add(fs, fsf); + if (evict != NULL) { + ret = cio_chunk_down(evict->chunk); + if (ret != CIO_OK) { + flb_error("[fstore] error unmapping file chunk: %s:%s", + fsf->stream->name, fsf->chunk->name); + return -1; + } + } + return 0; +} + +/* + * this function memory maps a file if it is unmapped, changing the file's state to up + * if it is down. if fsf->max_chunks_up files are up, a the least recently checked file + * will be evicted + * + * files_up record is reordered to put recently checked files first. + */ +static int file_up_if_down(struct flb_fstore *fs, + struct flb_fstore_file *fsf) +{ + int is_up; + int ret; + + /* check if already in up_files list */ + is_up = fsf->_cache_head.next != NULL; + + ret = file_up_prep(fs, fsf); + if (ret == -1) { + flb_error("[fstore] error preparing for file up: %s:%s", + fsf->stream->name, fsf->chunk->name); + return -1; + } + + /* memory map chunk */ + if (!is_up) { + ret = cio_chunk_up(fsf->chunk); + if (ret != CIO_OK) { + flb_error("[fstore] error mapping file chunk: %s:%s", + fsf->stream->name, fsf->chunk->name); + return -1; + } + } + return 0; +} + /* * this function sets metadata into a fstore_file structure, note that it makes * it own copy of the data to set a NULL byte at the end. @@ -77,6 +179,13 @@ int flb_fstore_file_meta_set(struct flb_fstore *fs, { int ret; + ret = file_up_if_down(fs, fsf); + if (ret == -1) { + flb_error("[fstore] file_meta_set could not bring up file: %s:%s", + fsf->stream->name, fsf->chunk->name); + return -1; + } + ret = cio_meta_write(fsf->chunk, meta, size); if (ret == -1) { flb_error("[fstore] could not write metadata to file: %s:%s", @@ -153,6 +262,9 @@ struct flb_fstore_file *flb_fstore_file_create(struct flb_fstore *fs, return NULL; } + fsf->_cache_head.next = NULL; + fsf->_cache_head.prev = NULL; + file_up_prep(fs, fsf); chunk = cio_chunk_open(fs->cio, fs_stream->stream, name, CIO_OPEN, size, &err); if (!chunk) { @@ -198,6 +310,9 @@ struct flb_fstore_file *flb_fstore_file_get(struct flb_fstore *fs, int flb_fstore_file_inactive(struct flb_fstore *fs, struct flb_fstore_file *fsf) { + /* remove from up list */ + files_up_remove(fs, fsf); + /* close the Chunk I/O reference, but don't delete the real file */ if (fsf->chunk) { cio_chunk_close(fsf->chunk, CIO_FALSE); @@ -218,6 +333,9 @@ int flb_fstore_file_inactive(struct flb_fstore *fs, int flb_fstore_file_delete(struct flb_fstore *fs, struct flb_fstore_file *fsf) { + /* remove from up list */ + files_up_remove(fs, fsf); + /* close the Chunk I/O reference, but don't delete it the real file */ cio_chunk_close(fsf->chunk, CIO_TRUE); @@ -242,6 +360,13 @@ int flb_fstore_file_content_copy(struct flb_fstore *fs, { int ret; + ret = file_up_if_down(fs, fsf); + if (ret == -1) { + flb_error("[fstore] file_content_copy could not bring up file: %s:%s", + fsf->stream->name, fsf->chunk->name); + return -1; + } + ret = cio_chunk_get_content_copy(fsf->chunk, out_buf, out_size); if (ret == CIO_OK) { return 0; @@ -251,10 +376,19 @@ int flb_fstore_file_content_copy(struct flb_fstore *fs, } /* Append data to an existing file */ -int flb_fstore_file_append(struct flb_fstore_file *fsf, void *data, size_t size) +int flb_fstore_file_append(struct flb_fstore *fs, + struct flb_fstore_file *fsf, + void *data, size_t size) { int ret; + ret = file_up_if_down(fs, fsf); + if (ret == -1) { + flb_error("[fstore] file_append could not bring up file: %s:%s", + fsf->stream->name, fsf->chunk->name); + return -1; + } + ret = cio_chunk_write(fsf->chunk, data, size); if (ret != CIO_OK) { flb_error("[fstore] could not write data to file %s", fsf->name); @@ -337,10 +471,21 @@ struct flb_fstore_stream *flb_fstore_stream_create(struct flb_fstore *fs, return fs_stream; } -void flb_fstore_stream_destroy(struct flb_fstore_stream *stream, int delete) -{ +void flb_fstore_stream_destroy(struct flb_fstore *fs, + struct flb_fstore_stream *stream, + int delete) +{ + struct flb_fstore_file *fsf; + struct mk_list *head; + if (delete == FLB_TRUE) { cio_stream_delete(stream->stream); + + /* remove stream files from fstore files_up up list */ + mk_list_foreach(head, &stream->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + files_up_remove(fs, fsf); + } } /* @@ -358,6 +503,7 @@ static int map_chunks(struct flb_fstore *ctx, struct flb_fstore_stream *fs_strea struct mk_list *head; struct cio_chunk *chunk; struct flb_fstore_file *fsf; + int ret; mk_list_foreach(head, &stream->chunks) { chunk = mk_list_entry(head, struct cio_chunk, _head); @@ -380,6 +526,12 @@ static int map_chunks(struct flb_fstore *ctx, struct flb_fstore_stream *fs_strea /* load metadata */ flb_fstore_file_meta_get(ctx, fsf); mk_list_add(&fsf->_head, &fs_stream->files); + + /* add to up list */ + ret = cio_chunk_is_up(chunk); + if (ret == CIO_TRUE) { + files_up_add(ctx, fsf); + } } return 0; @@ -445,6 +597,8 @@ struct flb_fstore *flb_fstore_create(char *path, int store_type) fs->root_path = cio->root_path; fs->store_type = store_type; mk_list_init(&fs->streams); + mk_list_init(&fs->files_up); + fs->files_up_counter = 0; /* Map Chunk I/O streams and chunks into fstore context */ load_references(fs); @@ -481,7 +635,7 @@ int flb_fstore_destroy(struct flb_fstore *fs) delete = FLB_FALSE; } - flb_fstore_stream_destroy(fs_stream, delete); + flb_fstore_stream_destroy(fs, fs_stream, delete); } if (fs->cio) { @@ -497,6 +651,7 @@ void flb_fstore_dump(struct flb_fstore *fs) struct mk_list *f_head; struct flb_fstore_stream *fs_stream; struct flb_fstore_file *fsf; + int is_up; printf("===== FSTORE DUMP =====\n"); mk_list_foreach(head, &fs->streams) { @@ -504,7 +659,9 @@ void flb_fstore_dump(struct flb_fstore *fs) printf("- stream: %s\n", fs_stream->name); mk_list_foreach(f_head, &fs_stream->files) { fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); - printf(" %s/%s\n", fsf->stream->name, fsf->name); + is_up = cio_chunk_is_up(fsf->chunk); + printf(" %s/%s (%s)\n", fsf->stream->name, fsf->name, + (is_up) ? "up" : "down"); } } printf("\n"); diff --git a/tests/internal/fstore.c b/tests/internal/fstore.c index 1f4de61b567..5650dc96e1b 100644 --- a/tests/internal/fstore.c +++ b/tests/internal/fstore.c @@ -64,7 +64,7 @@ void cb_all() ret = stat(FSF_STORE_PATH "/abc/example.txt", &st_data); TEST_CHECK(ret == 0); - ret = flb_fstore_file_append(fsf, "fluent-bit\n", 11); + ret = flb_fstore_file_append(fs, fsf, "fluent-bit\n", 11); TEST_CHECK(ret == 0); ret = flb_fstore_file_content_copy(fs, fsf, &out_buf, &out_size);