Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fstore: hold more than 64 files #15

Open
wants to merge 1 commit into
base: 1.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions include/fluent-bit/flb_fstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct flb_fstore_file {
struct cio_chunk *chunk; /* chunk context */
struct cio_stream *stream; /* parent stream that owns this file */
struct mk_list _head; /* link to parent flb_fstore->files */
struct mk_list _cache_head; /* link to flb_fstore->files_up cache */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use the variable to _files_up_head?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally like _cache_head to remind that the _files_up list is used as a cache of up files, but can switch to _files_up_head.

};

struct flb_fstore_stream {
Expand All @@ -55,6 +56,8 @@ struct flb_fstore {
char *root_path;
struct cio_ctx *cio; /* Chunk I/O context */
struct mk_list streams;
struct mk_list files_up; /* list of files that are up. last is oldest */
int files_up_counter;
};

struct flb_fstore *flb_fstore_create(char *path, int store_type);
Expand All @@ -63,7 +66,9 @@ int flb_fstore_destroy(struct flb_fstore *fs);

struct flb_fstore_stream *flb_fstore_stream_create(struct flb_fstore *fs,
char *stream_name);
void flb_fstore_stream_destroy(struct flb_fstore_stream *stream, int delete);
void flb_fstore_stream_destroy(struct flb_fstore *fs,
struct flb_fstore_stream *stream,
int delete);

int flb_fstore_file_meta_set(struct flb_fstore *fs,
struct flb_fstore_file *fsf,
Expand All @@ -80,7 +85,9 @@ int flb_fstore_file_content_copy(struct flb_fstore *fs,
struct flb_fstore_file *fsf,
void **out_buf, size_t *out_size);

int flb_fstore_file_append(struct flb_fstore_file *fsf, void *data, size_t size);
int flb_fstore_file_append(struct flb_fstore *fs,
struct flb_fstore_file *fsf,
void *data, size_t size);
struct flb_fstore_file *flb_fstore_file_get(struct flb_fstore *fs,
struct flb_fstore_stream *fs_stream,
char *name, size_t size);
Expand Down
2 changes: 1 addition & 1 deletion lib/monkey/include/monkey/mk_core/mk_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ static inline void mk_list_add_after(struct mk_list *_new,
}

next = prev->next;
next->prev = prev;
_new->next = next;
_new->prev = prev;
prev->next = _new;
next->prev = _new; /* how is this a bug not found? is it really? */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, you can move this after row 78.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to change the corresponding PR on Monkey Server... So the solution that got accepted has the weird ordering... Shucks. At least it fixes the bug.

}

static inline void __mk_list_del(struct mk_list *prev, struct mk_list *next)
Expand Down
2 changes: 1 addition & 1 deletion plugins/out_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ static int store_session_set(struct flb_calyptia *ctx, char *buf, size_t size)
}

/* store content */
ret = flb_fstore_file_append(ctx->fs_file, mp_buf, mp_size);
ret = flb_fstore_file_append(ctx->fs, ctx->fs_file, mp_buf, mp_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not store session information");
flb_free(mp_buf);
Expand Down
4 changes: 2 additions & 2 deletions plugins/out_s3/s3_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
}

/* Append data to the target file */
ret = flb_fstore_file_append(fsf, data, bytes);
ret = flb_fstore_file_append(ctx->fs, fsf, data, bytes);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsf used to be used as se_file->fsf. Why is the reason for using cts->fs directly here?

Copy link
Owner Author

@matthewfala matthewfala Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs which is the file store context has the up list which needs to be updated on append. fs is found in the s3 context. The function signature needed to be altered accordingly.

if (ret != 0) {
flb_plg_error(ctx->ins, "error writing data to local s3 file");
return -1;
Expand Down Expand Up @@ -489,7 +489,7 @@ int s3_store_file_upload_put(struct flb_s3 *ctx,
}

/* Append data to the target file */
ret = flb_fstore_file_append(fsf, data, flb_sds_len(data));
ret = flb_fstore_file_append(ctx->fs, fsf, data, flb_sds_len(data));
if (ret != 0) {
flb_plg_error(ctx->ins, "error writing data to local s3 file");
return -1;
Expand Down
167 changes: 162 additions & 5 deletions src/flb_fstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,108 @@ static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line,
return 0;
}

/* remove file from files_up list */
static inline void files_up_remove(struct flb_fstore *fs,
struct flb_fstore_file *fsf)
{
if (fsf->_cache_head.next == NULL) {
return;
}

mk_list_del(&fsf->_cache_head);
--fs->files_up_counter;
if (fs->files_up_counter < 0) { /* remove when stable? */
flb_error("[fstore] fs->files_up_counter is negative which should never happen");
}
}

/*
* this function adds files to an fstore files_up list to track files that are
* mapped. the files_up list's order corresponds to how recent add is called on
* the files in the files_up list. recent files first, least recent files are last
*
* max of max_chunks_up files allowed before eviction of least recently added file
*/
static struct flb_fstore_file *files_up_add(struct flb_fstore *fs,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please comments on what is the expected return output and its meaning.

struct flb_fstore_file *fsf)
{
struct flb_fstore_file *evict = NULL;

/* check if already in files_up */
if (fsf->_cache_head.next != NULL) {
/* send to front of list; renew */
files_up_remove(fs, fsf);
files_up_add(fs, fsf);
return NULL;
}

/* files_up cache full */
if (fs->files_up_counter >= fs->cio->max_chunks_up) {
/* evict oldest entry */
evict = mk_list_entry(fs->files_up.prev, struct flb_fstore_file, _cache_head);
files_up_remove(fs, evict);
}

/* add to files_up list, front */
mk_list_add_after(&fsf->_cache_head, &fs->files_up, &fs->files_up);
++fs->files_up_counter;
return evict;
}

/* lets down file if needed, fsf must be brought up soon after by caller */
static int file_up_prep(struct flb_fstore *fs, /* previously files_up_make_room */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put more comments on what this function does and the meaning of return value?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Compared to Eduardo's functions my functions already have a lot of comments. Disregarding conformity, this should be fine.

struct flb_fstore_file *fsf)
{
struct flb_fstore_file *evict;
int ret;

evict = files_up_add(fs, fsf);
if (evict != NULL) {
ret = cio_chunk_down(evict->chunk);
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment do describe unmapping

if (ret != CIO_OK) {
flb_error("[fstore] error unmapping file chunk: %s:%s",
fsf->stream->name, fsf->chunk->name);
return -1;
}
}
return 0;
}

/*
* this function memory maps a file if it is unmapped, changing the file's state to up
* if it is down. if fsf->max_chunks_up files are up, a the least recently checked file
* will be evicted
*
* files_up record is reordered to put recently checked files first.
*/
static int file_up_if_down(struct flb_fstore *fs,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call file_mapped_if_down?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO _up_ should be used preferably to _mapped_, because we are interfacing with the mmap syscall through chunkio api. The chunkio concept of up for files is mmap, but it is probably a good idea for the code to follow chunkio's concept abstraction

struct flb_fstore_file *fsf)
{
int is_up;
int ret;

/* check if already in up_files list */
is_up = fsf->_cache_head.next != NULL;

ret = file_up_prep(fs, fsf);
if (ret == -1) {
flb_error("[fstore] error preparing for file up: %s:%s",
fsf->stream->name, fsf->chunk->name);
return -1;
}

/* memory map chunk */
if (!is_up) {
ret = cio_chunk_up(fsf->chunk);
if (ret != CIO_OK) {
flb_error("[fstore] error mapping file chunk: %s:%s",
fsf->stream->name, fsf->chunk->name);
return -1;
}
}
return 0;
}

/*
* this function sets metadata into a fstore_file structure, note that it makes
* it own copy of the data to set a NULL byte at the end.
Expand Down Expand Up @@ -77,6 +179,13 @@ int flb_fstore_file_meta_set(struct flb_fstore *fs,
{
int ret;

ret = file_up_if_down(fs, fsf);
if (ret == -1) {
flb_error("[fstore] file_meta_set could not bring up file: %s:%s",
fsf->stream->name, fsf->chunk->name);
return -1;
}

ret = cio_meta_write(fsf->chunk, meta, size);
if (ret == -1) {
flb_error("[fstore] could not write metadata to file: %s:%s",
Expand Down Expand Up @@ -153,6 +262,9 @@ struct flb_fstore_file *flb_fstore_file_create(struct flb_fstore *fs,
return NULL;
}

fsf->_cache_head.next = NULL;
fsf->_cache_head.prev = NULL;
file_up_prep(fs, fsf);
chunk = cio_chunk_open(fs->cio, fs_stream->stream, name,
CIO_OPEN, size, &err);
if (!chunk) {
Expand Down Expand Up @@ -198,6 +310,9 @@ struct flb_fstore_file *flb_fstore_file_get(struct flb_fstore *fs,
int flb_fstore_file_inactive(struct flb_fstore *fs,
struct flb_fstore_file *fsf)
{
/* remove from up list */
files_up_remove(fs, fsf);

/* close the Chunk I/O reference, but don't delete the real file */
if (fsf->chunk) {
cio_chunk_close(fsf->chunk, CIO_FALSE);
Expand All @@ -218,6 +333,9 @@ int flb_fstore_file_inactive(struct flb_fstore *fs,
int flb_fstore_file_delete(struct flb_fstore *fs,
struct flb_fstore_file *fsf)
{
/* remove from up list */
files_up_remove(fs, fsf);

/* close the Chunk I/O reference, but don't delete it the real file */
cio_chunk_close(fsf->chunk, CIO_TRUE);

Expand All @@ -242,6 +360,13 @@ int flb_fstore_file_content_copy(struct flb_fstore *fs,
{
int ret;

ret = file_up_if_down(fs, fsf);
if (ret == -1) {
flb_error("[fstore] file_content_copy could not bring up file: %s:%s",
fsf->stream->name, fsf->chunk->name);
return -1;
}

ret = cio_chunk_get_content_copy(fsf->chunk, out_buf, out_size);
if (ret == CIO_OK) {
return 0;
Expand All @@ -251,10 +376,19 @@ int flb_fstore_file_content_copy(struct flb_fstore *fs,
}

/* Append data to an existing file */
int flb_fstore_file_append(struct flb_fstore_file *fsf, void *data, size_t size)
int flb_fstore_file_append(struct flb_fstore *fs,
struct flb_fstore_file *fsf,
void *data, size_t size)
{
int ret;

ret = file_up_if_down(fs, fsf);
if (ret == -1) {
flb_error("[fstore] file_append could not bring up file: %s:%s",
fsf->stream->name, fsf->chunk->name);
return -1;
}

ret = cio_chunk_write(fsf->chunk, data, size);
if (ret != CIO_OK) {
flb_error("[fstore] could not write data to file %s", fsf->name);
Expand Down Expand Up @@ -337,10 +471,21 @@ struct flb_fstore_stream *flb_fstore_stream_create(struct flb_fstore *fs,
return fs_stream;
}

void flb_fstore_stream_destroy(struct flb_fstore_stream *stream, int delete)
{
void flb_fstore_stream_destroy(struct flb_fstore *fs,
struct flb_fstore_stream *stream,
int delete)
{
struct flb_fstore_file *fsf;
struct mk_list *head;

if (delete == FLB_TRUE) {
cio_stream_delete(stream->stream);

/* remove stream files from fstore files_up up list */
mk_list_foreach(head, &stream->files) {
fsf = mk_list_entry(head, struct flb_fstore_file, _head);
files_up_remove(fs, fsf);
}
}

/*
Expand All @@ -358,6 +503,7 @@ static int map_chunks(struct flb_fstore *ctx, struct flb_fstore_stream *fs_strea
struct mk_list *head;
struct cio_chunk *chunk;
struct flb_fstore_file *fsf;
int ret;

mk_list_foreach(head, &stream->chunks) {
chunk = mk_list_entry(head, struct cio_chunk, _head);
Expand All @@ -380,6 +526,12 @@ static int map_chunks(struct flb_fstore *ctx, struct flb_fstore_stream *fs_strea
/* load metadata */
flb_fstore_file_meta_get(ctx, fsf);
mk_list_add(&fsf->_head, &fs_stream->files);

/* add to up list */
ret = cio_chunk_is_up(chunk);
if (ret == CIO_TRUE) {
files_up_add(ctx, fsf);
}
}

return 0;
Expand Down Expand Up @@ -445,6 +597,8 @@ struct flb_fstore *flb_fstore_create(char *path, int store_type)
fs->root_path = cio->root_path;
fs->store_type = store_type;
mk_list_init(&fs->streams);
mk_list_init(&fs->files_up);
fs->files_up_counter = 0;

/* Map Chunk I/O streams and chunks into fstore context */
load_references(fs);
Expand Down Expand Up @@ -481,7 +635,7 @@ int flb_fstore_destroy(struct flb_fstore *fs)
delete = FLB_FALSE;
}

flb_fstore_stream_destroy(fs_stream, delete);
flb_fstore_stream_destroy(fs, fs_stream, delete);
}

if (fs->cio) {
Expand All @@ -497,14 +651,17 @@ void flb_fstore_dump(struct flb_fstore *fs)
struct mk_list *f_head;
struct flb_fstore_stream *fs_stream;
struct flb_fstore_file *fsf;
int is_up;

printf("===== FSTORE DUMP =====\n");
mk_list_foreach(head, &fs->streams) {
fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
printf("- stream: %s\n", fs_stream->name);
mk_list_foreach(f_head, &fs_stream->files) {
fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
printf(" %s/%s\n", fsf->stream->name, fsf->name);
is_up = cio_chunk_is_up(fsf->chunk);
printf(" %s/%s (%s)\n", fsf->stream->name, fsf->name,
(is_up) ? "up" : "down");
}
}
printf("\n");
Expand Down
2 changes: 1 addition & 1 deletion tests/internal/fstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void cb_all()
ret = stat(FSF_STORE_PATH "/abc/example.txt", &st_data);
TEST_CHECK(ret == 0);

ret = flb_fstore_file_append(fsf, "fluent-bit\n", 11);
ret = flb_fstore_file_append(fs, fsf, "fluent-bit\n", 11);
TEST_CHECK(ret == 0);

ret = flb_fstore_file_content_copy(fs, fsf, &out_buf, &out_size);
Expand Down