From ec7b33276b101fd5d9e304a8cab4a6c0b1b7b34f Mon Sep 17 00:00:00 2001 From: jzajic Date: Wed, 23 Nov 2022 22:29:08 +0100 Subject: [PATCH] Fix https://github.com/fluent/fluent-bit-go/issues/49 Signed-off-by: jzajic --- include/fluent-bit/flb_input.h | 3 ++ include/fluent-bit/flb_output.h | 3 ++ include/fluent-bit/flb_plugin.h | 1 + src/flb_output.c | 7 +-- src/flb_plugin.c | 31 ++++++++++++- src/flb_plugin_proxy.c | 77 +++++++++++++++++++++++++-------- src/proxy/go/go.c | 21 ++++++--- src/proxy/go/go.h | 4 +- 8 files changed, 116 insertions(+), 31 deletions(-) diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index fd8b275882e..37edd3f3120 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -134,6 +134,9 @@ struct flb_input_plugin { /* Exit */ int (*cb_exit) (void *, struct flb_config *); + /* Destroy */ + void (*cb_destroy) (struct flb_input_plugin *); + void *instance; struct mk_list _head; diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 4ddd937390c..10ca9932440 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -205,6 +205,9 @@ struct flb_output_plugin { /* Exit */ int (*cb_exit) (void *, struct flb_config *); + /* Destroy */ + void (*cb_destroy) (struct flb_output_plugin *); + /* Default number of worker threads */ int workers; diff --git a/include/fluent-bit/flb_plugin.h b/include/fluent-bit/flb_plugin.h index 00997334897..9f2bab3cccd 100644 --- a/include/fluent-bit/flb_plugin.h +++ b/include/fluent-bit/flb_plugin.h @@ -33,6 +33,7 @@ struct flb_plugin { int type; /* plugin type */ flb_sds_t path; /* path for .so file */ void *dso_handle; /* shared object handler */ + void *plugin_handler; /* plugin struct handler */ struct mk_list _head; /* link to struct flb_plugins */ }; diff --git a/src/flb_output.c b/src/flb_output.c index 85af7e8a990..636a5ec1b2b 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -448,12 +448,7 @@ void flb_output_exit(struct flb_config *config) /* Check a exit callback */ if (p->cb_exit) { - if (!p->proxy) { - p->cb_exit(ins->context, config); - } - else { - p->cb_exit(p, ins->context); - } + p->cb_exit(ins->context, config); } flb_output_instance_destroy(ins); } diff --git a/src/flb_plugin.c b/src/flb_plugin.c index 37dcb026f42..02d369e2cef 100644 --- a/src/flb_plugin.c +++ b/src/flb_plugin.c @@ -168,6 +168,30 @@ static void destroy_plugin(struct flb_plugin *plugin) flb_free(plugin); } +static void destroy_input_plugin(struct flb_plugin *plugin) +{ + struct flb_input_plugin *input; + if(plugin->plugin_handler != NULL) { + input = (struct flb_input_plugin *) plugin->plugin_handler; + if(input->cb_destroy != NULL) { + input->cb_destroy(plugin->plugin_handler); + } + } + destroy_plugin(plugin); +} + +static void destroy_output_plugin(struct flb_plugin *plugin) +{ + struct flb_output_plugin *output; + if(plugin->plugin_handler != NULL) { + output = (struct flb_output_plugin *) plugin->plugin_handler; + if(output->cb_destroy != NULL) { + output->cb_destroy(plugin->plugin_handler); + } + } + destroy_plugin(plugin); +} + /* Creates the global plugin context for 'dynamic plugins' */ struct flb_plugins *flb_plugin_create() { @@ -281,16 +305,19 @@ int flb_plugin_load(char *path, struct flb_plugins *ctx, plugin->type = type; plugin->path = flb_sds_create(path); plugin->dso_handle = dso_handle; + plugin->plugin_handler = NULL; /* Link by type to the plugins parent context */ if (type == FLB_PLUGIN_INPUT) { mk_list_add(&plugin->_head, &ctx->input); + plugin->plugin_handler = input; } else if (type == FLB_PLUGIN_FILTER) { mk_list_add(&plugin->_head, &ctx->filter); } else if (type == FLB_PLUGIN_OUTPUT) { mk_list_add(&plugin->_head, &ctx->output); + plugin->plugin_handler = output; } return 0; @@ -403,7 +430,7 @@ void flb_plugin_destroy(struct flb_plugins *ctx) mk_list_foreach_safe(head, tmp, &ctx->input) { plugin = mk_list_entry(head, struct flb_plugin, _head); - destroy_plugin(plugin); + destroy_input_plugin(plugin); } mk_list_foreach_safe(head, tmp, &ctx->filter) { @@ -413,7 +440,7 @@ void flb_plugin_destroy(struct flb_plugins *ctx) mk_list_foreach_safe(head, tmp, &ctx->output) { plugin = mk_list_entry(head, struct flb_plugin, _head); - destroy_plugin(plugin); + destroy_output_plugin(plugin); } flb_free(ctx); diff --git a/src/flb_plugin_proxy.c b/src/flb_plugin_proxy.c index e7ab2429455..59c50a1b6ea 100644 --- a/src/flb_plugin_proxy.c +++ b/src/flb_plugin_proxy.c @@ -183,37 +183,85 @@ static void flb_proxy_input_cb_resume(void *data, struct flb_config *config) } static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy); -static int flb_proxy_output_cb_exit(void *data, struct flb_config *config) + +static int flb_proxy_output_cb_exit(void *out_context, struct flb_config *config) { - struct flb_output_plugin *instance = data; - struct flb_plugin_proxy *proxy = (instance->proxy); + struct flb_plugin_proxy_context *ctx = out_context; + struct flb_plugin_proxy *proxy = (ctx->proxy); + + if (!out_context) { + return 0; + } if (proxy->def->proxy == FLB_PROXY_GOLANG) { - proxy_go_output_destroy(proxy->data); +#ifdef FLB_HAVE_PROXY_GO + proxy_go_output_destroy(ctx, proxy->data); +#endif } - flb_plugin_proxy_destroy(proxy); + + flb_free(ctx); return 0; } +static void flb_proxy_output_cb_destroy(struct flb_output_plugin *plugin) +{ + struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy; + /* cleanup */ + void (*cb_unregister)(struct flb_plugin_proxy_def *def); + + cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister"); + if (cb_unregister != NULL) { + cb_unregister(proxy->def); + } + + if (proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + proxy_go_output_unregister(proxy->data); +#endif + } + + flb_plugin_proxy_destroy(proxy); +} + static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config) { struct flb_plugin_input_proxy_context *ctx = in_context; + struct flb_plugin_proxy *proxy = (ctx->proxy); if (!in_context) { return 0; } - if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { - proxy_go_input_destroy(ctx->proxy->data); + if (proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + proxy_go_input_destroy(proxy->data); +#endif } - flb_plugin_proxy_destroy(ctx->proxy); - flb_free(ctx); - return 0; } +static void flb_proxy_input_cb_destroy(struct flb_input_plugin *plugin) +{ + struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy; + /* cleanup */ + void (*cb_unregister)(struct flb_plugin_proxy_def *def); + + cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister"); + if (cb_unregister != NULL) { + cb_unregister(proxy->def); + } + + if (proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + proxy_go_input_unregister(proxy->data); +#endif + } + + flb_plugin_proxy_destroy(proxy); +} + static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, struct flb_plugin_proxy_def *def, struct flb_config *config) @@ -241,6 +289,7 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, */ out->cb_flush = proxy_cb_flush; out->cb_exit = flb_proxy_output_cb_exit; + out->cb_destroy = flb_proxy_output_cb_destroy; return 0; } @@ -273,6 +322,7 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy, in->cb_collect = flb_proxy_input_cb_collect; in->cb_flush_buf = NULL; in->cb_exit = flb_proxy_input_cb_exit; + in->cb_destroy = flb_proxy_input_cb_destroy; in->cb_pause = flb_proxy_input_cb_pause; in->cb_resume = flb_proxy_input_cb_resume; return 0; @@ -427,13 +477,6 @@ struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type, static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy) { - /* cleanup */ - void (*cb_unregister)(struct flb_plugin_proxy_def *def); - - cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister"); - if (cb_unregister != NULL) { - cb_unregister(proxy->def); - } flb_free(proxy->def); flb_api_destroy(proxy->api); dlclose(proxy->dso_handler); diff --git a/src/proxy/go/go.c b/src/proxy/go/go.c index 1568835b221..a1fab95ff95 100644 --- a/src/proxy/go/go.c +++ b/src/proxy/go/go.c @@ -145,7 +145,7 @@ int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx, return ret; } -int proxy_go_output_destroy(void *data) +int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx, void *data) { int ret = 0; struct flbgo_output_plugin *plugin; @@ -154,14 +154,20 @@ int proxy_go_output_destroy(void *data) flb_debug("[GO] running exit callback"); if (plugin->cb_exit_ctx) { - ret = plugin->cb_exit_ctx(plugin->context->remote_context); + ret = plugin->cb_exit_ctx(ctx->remote_context); } else if (plugin->cb_exit) { ret = plugin->cb_exit(); } + return ret; +} + +void proxy_go_output_unregister(void *data) { + struct flbgo_output_plugin *plugin; + + plugin = (struct flbgo_output_plugin *) data; flb_free(plugin->name); flb_free(plugin); - return ret; } int proxy_go_input_register(struct flb_plugin_proxy *proxy, @@ -262,8 +268,13 @@ int proxy_go_input_destroy(void *data) flb_debug("[GO] running exit callback"); ret = plugin->cb_exit(); + return ret; +} + +void proxy_go_input_unregister(void *data) { + struct flbgo_input_plugin *plugin; + plugin = (struct flbgo_input_plugin *) data; flb_free(plugin->name); flb_free(plugin); - return ret; -} +} \ No newline at end of file diff --git a/src/proxy/go/go.h b/src/proxy/go/go.h index 1cc0cc141e7..a2b61744c51 100644 --- a/src/proxy/go/go.h +++ b/src/proxy/go/go.h @@ -56,7 +56,8 @@ int proxy_go_output_init(struct flb_plugin_proxy *proxy); int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx, const void *data, size_t size, const char *tag, int tag_len); -int proxy_go_output_destroy(void *data); +int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx, void *data); +void proxy_go_output_unregister(void *data); int proxy_go_input_register(struct flb_plugin_proxy *proxy, struct flb_plugin_proxy_def *def); @@ -67,4 +68,5 @@ int proxy_go_input_collect(struct flb_plugin_proxy *ctx, int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx, void *allocated_data); int proxy_go_input_destroy(void *data); +void proxy_go_input_unregister(void *data); #endif