Skip to content

Commit

Permalink
proxy: fix memory bugs when use same golang plugin multiple times
Browse files Browse the repository at this point in the history
Fix fluent/fluent-bit-go#49

Signed-off-by: jzajic <[email protected]>
  • Loading branch information
jan-zajic committed Nov 28, 2022
1 parent b984605 commit d7a2f00
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 33 deletions.
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ struct flb_input_plugin {
/* Exit */
int (*cb_exit) (void *, struct flb_config *);

/* Destroy */
void (*cb_destroy) (struct flb_input_plugin *);

void *instance;

struct mk_list _head;
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ struct flb_output_plugin {
/* Exit */
int (*cb_exit) (void *, struct flb_config *);

/* Destroy */
void (*cb_destroy) (struct flb_output_plugin *);

/* Default number of worker threads */
int workers;

Expand Down
6 changes: 6 additions & 0 deletions include/fluent-bit/flb_plugins.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,18 @@ void flb_plugins_unregister(struct flb_config *config)

mk_list_foreach_safe(head, tmp, &config->in_plugins) {
in = mk_list_entry(head, struct flb_input_plugin, _head);
if(in->cb_destroy) {
in->cb_destroy(in);
}
mk_list_del(&in->_head);
flb_free(in);
}

mk_list_foreach_safe(head, tmp, &config->out_plugins) {
out = mk_list_entry(head, struct flb_output_plugin, _head);
if(out->cb_destroy) {
out->cb_destroy(out);
}
mk_list_del(&out->_head);
flb_free(out);
}
Expand Down
7 changes: 1 addition & 6 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,7 @@ void flb_output_exit(struct flb_config *config)

/* Check a exit callback */
if (p->cb_exit) {
if (!p->proxy) {
p->cb_exit(ins->context, config);
}
else {
p->cb_exit(p, ins->context);
}
p->cb_exit(ins->context, config);
}
flb_output_instance_destroy(ins);
}
Expand Down
77 changes: 60 additions & 17 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,37 +183,85 @@ static void flb_proxy_input_cb_resume(void *data, struct flb_config *config)
}

static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy);
static int flb_proxy_output_cb_exit(void *data, struct flb_config *config)

static int flb_proxy_output_cb_exit(void *out_context, struct flb_config *config)
{
struct flb_output_plugin *instance = data;
struct flb_plugin_proxy *proxy = (instance->proxy);
struct flb_plugin_proxy_context *ctx = out_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);

if (!out_context) {
return 0;
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
proxy_go_output_destroy(proxy->data);
#ifdef FLB_HAVE_PROXY_GO
proxy_go_output_destroy(ctx);
#endif
}
flb_plugin_proxy_destroy(proxy);

flb_free(ctx);
return 0;
}

static void flb_proxy_output_cb_destroy(struct flb_output_plugin *plugin)
{
struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy;
/* cleanup */
void (*cb_unregister)(struct flb_plugin_proxy_def *def);

cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
if (cb_unregister != NULL) {
cb_unregister(proxy->def);
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
proxy_go_output_unregister(proxy->data);
#endif
}

flb_plugin_proxy_destroy(proxy);
}

static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config)
{
struct flb_plugin_input_proxy_context *ctx = in_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);

if (!in_context) {
return 0;
}

if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
proxy_go_input_destroy(ctx->proxy->data);
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
proxy_go_output_destroy(ctx);
#endif
}

flb_plugin_proxy_destroy(ctx->proxy);

flb_free(ctx);

return 0;
}

static void flb_proxy_input_cb_destroy(struct flb_input_plugin *plugin)
{
struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy;
/* cleanup */
void (*cb_unregister)(struct flb_plugin_proxy_def *def);

cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
if (cb_unregister != NULL) {
cb_unregister(proxy->def);
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
proxy_go_input_unregister(proxy->data);
#endif
}

flb_plugin_proxy_destroy(proxy);
}

static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def,
struct flb_config *config)
Expand Down Expand Up @@ -241,6 +289,7 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
*/
out->cb_flush = proxy_cb_flush;
out->cb_exit = flb_proxy_output_cb_exit;
out->cb_destroy = flb_proxy_output_cb_destroy;
return 0;
}

Expand Down Expand Up @@ -273,6 +322,7 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy,
in->cb_collect = flb_proxy_input_cb_collect;
in->cb_flush_buf = NULL;
in->cb_exit = flb_proxy_input_cb_exit;
in->cb_destroy = flb_proxy_input_cb_destroy;
in->cb_pause = flb_proxy_input_cb_pause;
in->cb_resume = flb_proxy_input_cb_resume;
return 0;
Expand Down Expand Up @@ -427,13 +477,6 @@ struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type,

static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy)
{
/* cleanup */
void (*cb_unregister)(struct flb_plugin_proxy_def *def);

cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
if (cb_unregister != NULL) {
cb_unregister(proxy->def);
}
flb_free(proxy->def);
flb_api_destroy(proxy->api);
dlclose(proxy->dso_handler);
Expand Down
30 changes: 22 additions & 8 deletions src/proxy/go/go.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,29 @@ int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx,
return ret;
}

int proxy_go_output_destroy(void *data)
int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx)
{
int ret = 0;
struct flbgo_output_plugin *plugin;

plugin = (struct flbgo_output_plugin *) data;
plugin = (struct flbgo_output_plugin *) ctx->proxy->instance;
flb_debug("[GO] running exit callback");

if (plugin->cb_exit_ctx) {
ret = plugin->cb_exit_ctx(plugin->context->remote_context);
ret = plugin->cb_exit_ctx(ctx->remote_context);
}
else if (plugin->cb_exit) {
ret = plugin->cb_exit();
}
return ret;
}

void proxy_go_output_unregister(void *data) {
struct flbgo_output_plugin *plugin;

plugin = (struct flbgo_output_plugin *) data;
flb_free(plugin->name);
flb_free(plugin);
return ret;
}

int proxy_go_input_register(struct flb_plugin_proxy *proxy,
Expand Down Expand Up @@ -195,6 +201,7 @@ int proxy_go_input_register(struct flb_plugin_proxy *proxy,
plugin->cb_collect = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCallback");
plugin->cb_cleanup = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCleanupCallback");
plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit");
plugin->cb_exit_ctx = flb_plugin_proxy_symbol(proxy, "FLBPluginExitCtx");
plugin->name = flb_strdup(def->name);

/* This Go plugin context is an opaque data for the parent proxy */
Expand Down Expand Up @@ -253,17 +260,24 @@ int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
return ret;
}

int proxy_go_input_destroy(void *data)
int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx)
{
int ret = 0;
struct flbgo_input_plugin *plugin;

plugin = (struct flbgo_input_plugin *) data;
plugin = (struct flbgo_input_plugin *) ctx->proxy->data;
flb_debug("[GO] running exit callback");

ret = plugin->cb_exit();
if (plugin->cb_exit) {
ret = plugin->cb_exit();
}
return ret;
}

void proxy_go_input_unregister(void *data) {
struct flbgo_input_plugin *plugin;

plugin = (struct flbgo_input_plugin *) data;
flb_free(plugin->name);
flb_free(plugin);
return ret;
}
6 changes: 4 additions & 2 deletions src/proxy/go/go.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ int proxy_go_output_init(struct flb_plugin_proxy *proxy);
int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx,
const void *data, size_t size,
const char *tag, int tag_len);
int proxy_go_output_destroy(void *data);
int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx);
void proxy_go_output_unregister(void *data);

int proxy_go_input_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def);
Expand All @@ -66,5 +67,6 @@ int proxy_go_input_collect(struct flb_plugin_proxy *ctx,
void **collected_data, size_t *len);
int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
void *allocated_data);
int proxy_go_input_destroy(void *data);
int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx);
void proxy_go_input_unregister(void *data);
#endif

0 comments on commit d7a2f00

Please sign in to comment.