diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index fd8b275882e..37edd3f3120 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -134,6 +134,9 @@ struct flb_input_plugin { /* Exit */ int (*cb_exit) (void *, struct flb_config *); + /* Destroy */ + void (*cb_destroy) (struct flb_input_plugin *); + void *instance; struct mk_list _head; diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 4ddd937390c..10ca9932440 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -205,6 +205,9 @@ struct flb_output_plugin { /* Exit */ int (*cb_exit) (void *, struct flb_config *); + /* Destroy */ + void (*cb_destroy) (struct flb_output_plugin *); + /* Default number of worker threads */ int workers; diff --git a/include/fluent-bit/flb_plugins.h.in b/include/fluent-bit/flb_plugins.h.in index 37880f113d5..7e921c4a228 100644 --- a/include/fluent-bit/flb_plugins.h.in +++ b/include/fluent-bit/flb_plugins.h.in @@ -64,12 +64,18 @@ void flb_plugins_unregister(struct flb_config *config) mk_list_foreach_safe(head, tmp, &config->in_plugins) { in = mk_list_entry(head, struct flb_input_plugin, _head); + if(in->cb_destroy) { + in->cb_destroy(in); + } mk_list_del(&in->_head); flb_free(in); } mk_list_foreach_safe(head, tmp, &config->out_plugins) { out = mk_list_entry(head, struct flb_output_plugin, _head); + if(out->cb_destroy) { + out->cb_destroy(out); + } mk_list_del(&out->_head); flb_free(out); } diff --git a/src/flb_output.c b/src/flb_output.c index 85af7e8a990..636a5ec1b2b 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -448,12 +448,7 @@ void flb_output_exit(struct flb_config *config) /* Check a exit callback */ if (p->cb_exit) { - if (!p->proxy) { - p->cb_exit(ins->context, config); - } - else { - p->cb_exit(p, ins->context); - } + p->cb_exit(ins->context, config); } flb_output_instance_destroy(ins); } diff --git a/src/flb_plugin_proxy.c b/src/flb_plugin_proxy.c index e7ab2429455..7cebafac868 100644 --- a/src/flb_plugin_proxy.c +++ b/src/flb_plugin_proxy.c @@ -183,37 +183,85 @@ static void flb_proxy_input_cb_resume(void *data, struct flb_config *config) } static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy); -static int flb_proxy_output_cb_exit(void *data, struct flb_config *config) + +static int flb_proxy_output_cb_exit(void *out_context, struct flb_config *config) { - struct flb_output_plugin *instance = data; - struct flb_plugin_proxy *proxy = (instance->proxy); + struct flb_plugin_proxy_context *ctx = out_context; + struct flb_plugin_proxy *proxy = (ctx->proxy); + + if (!out_context) { + return 0; + } if (proxy->def->proxy == FLB_PROXY_GOLANG) { - proxy_go_output_destroy(proxy->data); +#ifdef FLB_HAVE_PROXY_GO + proxy_go_output_destroy(ctx); +#endif } - flb_plugin_proxy_destroy(proxy); + + flb_free(ctx); return 0; } +static void flb_proxy_output_cb_destroy(struct flb_output_plugin *plugin) +{ + struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy; + /* cleanup */ + void (*cb_unregister)(struct flb_plugin_proxy_def *def); + + cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister"); + if (cb_unregister != NULL) { + cb_unregister(proxy->def); + } + + if (proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + proxy_go_output_unregister(proxy->data); +#endif + } + + flb_plugin_proxy_destroy(proxy); +} + static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config) { struct flb_plugin_input_proxy_context *ctx = in_context; + struct flb_plugin_proxy *proxy = (ctx->proxy); if (!in_context) { return 0; } - if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { - proxy_go_input_destroy(ctx->proxy->data); + if (proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + proxy_go_input_destroy(ctx); +#endif } - flb_plugin_proxy_destroy(ctx->proxy); - flb_free(ctx); - return 0; } +static void flb_proxy_input_cb_destroy(struct flb_input_plugin *plugin) +{ + struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy; + /* cleanup */ + void (*cb_unregister)(struct flb_plugin_proxy_def *def); + + cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister"); + if (cb_unregister != NULL) { + cb_unregister(proxy->def); + } + + if (proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + proxy_go_input_unregister(proxy->data); +#endif + } + + flb_plugin_proxy_destroy(proxy); +} + static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, struct flb_plugin_proxy_def *def, struct flb_config *config) @@ -241,6 +289,7 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, */ out->cb_flush = proxy_cb_flush; out->cb_exit = flb_proxy_output_cb_exit; + out->cb_destroy = flb_proxy_output_cb_destroy; return 0; } @@ -273,6 +322,7 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy, in->cb_collect = flb_proxy_input_cb_collect; in->cb_flush_buf = NULL; in->cb_exit = flb_proxy_input_cb_exit; + in->cb_destroy = flb_proxy_input_cb_destroy; in->cb_pause = flb_proxy_input_cb_pause; in->cb_resume = flb_proxy_input_cb_resume; return 0; @@ -427,13 +477,6 @@ struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type, static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy) { - /* cleanup */ - void (*cb_unregister)(struct flb_plugin_proxy_def *def); - - cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister"); - if (cb_unregister != NULL) { - cb_unregister(proxy->def); - } flb_free(proxy->def); flb_api_destroy(proxy->api); dlclose(proxy->dso_handler); diff --git a/src/proxy/go/go.c b/src/proxy/go/go.c index 1568835b221..d03abe35c61 100644 --- a/src/proxy/go/go.c +++ b/src/proxy/go/go.c @@ -145,23 +145,29 @@ int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx, return ret; } -int proxy_go_output_destroy(void *data) +int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx) { int ret = 0; struct flbgo_output_plugin *plugin; - plugin = (struct flbgo_output_plugin *) data; + plugin = (struct flbgo_output_plugin *) ctx->proxy->data; flb_debug("[GO] running exit callback"); if (plugin->cb_exit_ctx) { - ret = plugin->cb_exit_ctx(plugin->context->remote_context); + ret = plugin->cb_exit_ctx(ctx->remote_context); } else if (plugin->cb_exit) { ret = plugin->cb_exit(); } + return ret; +} + +void proxy_go_output_unregister(void *data) { + struct flbgo_output_plugin *plugin; + + plugin = (struct flbgo_output_plugin *) data; flb_free(plugin->name); flb_free(plugin); - return ret; } int proxy_go_input_register(struct flb_plugin_proxy *proxy, @@ -253,17 +259,24 @@ int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx, return ret; } -int proxy_go_input_destroy(void *data) +int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx) { int ret = 0; struct flbgo_input_plugin *plugin; - plugin = (struct flbgo_input_plugin *) data; + plugin = (struct flbgo_input_plugin *) ctx->proxy->data; flb_debug("[GO] running exit callback"); - ret = plugin->cb_exit(); + if (plugin->cb_exit) { + ret = plugin->cb_exit(); + } + return ret; +} +void proxy_go_input_unregister(void *data) { + struct flbgo_input_plugin *plugin; + + plugin = (struct flbgo_input_plugin *) data; flb_free(plugin->name); flb_free(plugin); - return ret; } diff --git a/src/proxy/go/go.h b/src/proxy/go/go.h index 1cc0cc141e7..4c3fedb23af 100644 --- a/src/proxy/go/go.h +++ b/src/proxy/go/go.h @@ -56,7 +56,8 @@ int proxy_go_output_init(struct flb_plugin_proxy *proxy); int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx, const void *data, size_t size, const char *tag, int tag_len); -int proxy_go_output_destroy(void *data); +int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx); +void proxy_go_output_unregister(void *data); int proxy_go_input_register(struct flb_plugin_proxy *proxy, struct flb_plugin_proxy_def *def); @@ -66,5 +67,6 @@ int proxy_go_input_collect(struct flb_plugin_proxy *ctx, void **collected_data, size_t *len); int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx, void *allocated_data); -int proxy_go_input_destroy(void *data); +int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx); +void proxy_go_input_unregister(void *data); #endif