Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: jzajic <[email protected]>
  • Loading branch information
jan-zajic committed Nov 23, 2022
1 parent b984605 commit ec7b332
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 31 deletions.
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ struct flb_input_plugin {
/* Exit */
int (*cb_exit) (void *, struct flb_config *);

/* Destroy */
void (*cb_destroy) (struct flb_input_plugin *);

void *instance;

struct mk_list _head;
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ struct flb_output_plugin {
/* Exit */
int (*cb_exit) (void *, struct flb_config *);

/* Destroy */
void (*cb_destroy) (struct flb_output_plugin *);

/* Default number of worker threads */
int workers;

Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct flb_plugin {
int type; /* plugin type */
flb_sds_t path; /* path for .so file */
void *dso_handle; /* shared object handler */
void *plugin_handler; /* plugin struct handler */
struct mk_list _head; /* link to struct flb_plugins */
};

Expand Down
7 changes: 1 addition & 6 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,7 @@ void flb_output_exit(struct flb_config *config)

/* Check a exit callback */
if (p->cb_exit) {
if (!p->proxy) {
p->cb_exit(ins->context, config);
}
else {
p->cb_exit(p, ins->context);
}
p->cb_exit(ins->context, config);
}
flb_output_instance_destroy(ins);
}
Expand Down
31 changes: 29 additions & 2 deletions src/flb_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,30 @@ static void destroy_plugin(struct flb_plugin *plugin)
flb_free(plugin);
}

static void destroy_input_plugin(struct flb_plugin *plugin)
{
struct flb_input_plugin *input;
if(plugin->plugin_handler != NULL) {
input = (struct flb_input_plugin *) plugin->plugin_handler;
if(input->cb_destroy != NULL) {
input->cb_destroy(plugin->plugin_handler);
}
}
destroy_plugin(plugin);
}

static void destroy_output_plugin(struct flb_plugin *plugin)
{
struct flb_output_plugin *output;
if(plugin->plugin_handler != NULL) {
output = (struct flb_output_plugin *) plugin->plugin_handler;
if(output->cb_destroy != NULL) {
output->cb_destroy(plugin->plugin_handler);
}
}
destroy_plugin(plugin);
}

/* Creates the global plugin context for 'dynamic plugins' */
struct flb_plugins *flb_plugin_create()
{
Expand Down Expand Up @@ -281,16 +305,19 @@ int flb_plugin_load(char *path, struct flb_plugins *ctx,
plugin->type = type;
plugin->path = flb_sds_create(path);
plugin->dso_handle = dso_handle;
plugin->plugin_handler = NULL;

/* Link by type to the plugins parent context */
if (type == FLB_PLUGIN_INPUT) {
mk_list_add(&plugin->_head, &ctx->input);
plugin->plugin_handler = input;
}
else if (type == FLB_PLUGIN_FILTER) {
mk_list_add(&plugin->_head, &ctx->filter);
}
else if (type == FLB_PLUGIN_OUTPUT) {
mk_list_add(&plugin->_head, &ctx->output);
plugin->plugin_handler = output;
}

return 0;
Expand Down Expand Up @@ -403,7 +430,7 @@ void flb_plugin_destroy(struct flb_plugins *ctx)

mk_list_foreach_safe(head, tmp, &ctx->input) {
plugin = mk_list_entry(head, struct flb_plugin, _head);
destroy_plugin(plugin);
destroy_input_plugin(plugin);
}

mk_list_foreach_safe(head, tmp, &ctx->filter) {
Expand All @@ -413,7 +440,7 @@ void flb_plugin_destroy(struct flb_plugins *ctx)

mk_list_foreach_safe(head, tmp, &ctx->output) {
plugin = mk_list_entry(head, struct flb_plugin, _head);
destroy_plugin(plugin);
destroy_output_plugin(plugin);
}

flb_free(ctx);
Expand Down
77 changes: 60 additions & 17 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,37 +183,85 @@ static void flb_proxy_input_cb_resume(void *data, struct flb_config *config)
}

static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy);
static int flb_proxy_output_cb_exit(void *data, struct flb_config *config)

static int flb_proxy_output_cb_exit(void *out_context, struct flb_config *config)
{
struct flb_output_plugin *instance = data;
struct flb_plugin_proxy *proxy = (instance->proxy);
struct flb_plugin_proxy_context *ctx = out_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);

if (!out_context) {
return 0;
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
proxy_go_output_destroy(proxy->data);
#ifdef FLB_HAVE_PROXY_GO
proxy_go_output_destroy(ctx, proxy->data);
#endif
}
flb_plugin_proxy_destroy(proxy);

flb_free(ctx);
return 0;
}

static void flb_proxy_output_cb_destroy(struct flb_output_plugin *plugin)
{
struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy;
/* cleanup */
void (*cb_unregister)(struct flb_plugin_proxy_def *def);

cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
if (cb_unregister != NULL) {
cb_unregister(proxy->def);
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
proxy_go_output_unregister(proxy->data);
#endif
}

flb_plugin_proxy_destroy(proxy);
}

static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config)
{
struct flb_plugin_input_proxy_context *ctx = in_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);

if (!in_context) {
return 0;
}

if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
proxy_go_input_destroy(ctx->proxy->data);
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
proxy_go_input_destroy(proxy->data);
#endif
}

flb_plugin_proxy_destroy(ctx->proxy);

flb_free(ctx);

return 0;
}

static void flb_proxy_input_cb_destroy(struct flb_input_plugin *plugin)
{
struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy;
/* cleanup */
void (*cb_unregister)(struct flb_plugin_proxy_def *def);

cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
if (cb_unregister != NULL) {
cb_unregister(proxy->def);
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
proxy_go_input_unregister(proxy->data);
#endif
}

flb_plugin_proxy_destroy(proxy);
}

static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def,
struct flb_config *config)
Expand Down Expand Up @@ -241,6 +289,7 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
*/
out->cb_flush = proxy_cb_flush;
out->cb_exit = flb_proxy_output_cb_exit;
out->cb_destroy = flb_proxy_output_cb_destroy;
return 0;
}

Expand Down Expand Up @@ -273,6 +322,7 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy,
in->cb_collect = flb_proxy_input_cb_collect;
in->cb_flush_buf = NULL;
in->cb_exit = flb_proxy_input_cb_exit;
in->cb_destroy = flb_proxy_input_cb_destroy;
in->cb_pause = flb_proxy_input_cb_pause;
in->cb_resume = flb_proxy_input_cb_resume;
return 0;
Expand Down Expand Up @@ -427,13 +477,6 @@ struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type,

static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy)
{
/* cleanup */
void (*cb_unregister)(struct flb_plugin_proxy_def *def);

cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
if (cb_unregister != NULL) {
cb_unregister(proxy->def);
}
flb_free(proxy->def);
flb_api_destroy(proxy->api);
dlclose(proxy->dso_handler);
Expand Down
21 changes: 16 additions & 5 deletions src/proxy/go/go.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx,
return ret;
}

int proxy_go_output_destroy(void *data)
int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx, void *data)
{
int ret = 0;
struct flbgo_output_plugin *plugin;
Expand All @@ -154,14 +154,20 @@ int proxy_go_output_destroy(void *data)
flb_debug("[GO] running exit callback");

if (plugin->cb_exit_ctx) {
ret = plugin->cb_exit_ctx(plugin->context->remote_context);
ret = plugin->cb_exit_ctx(ctx->remote_context);
}
else if (plugin->cb_exit) {
ret = plugin->cb_exit();
}
return ret;
}

void proxy_go_output_unregister(void *data) {
struct flbgo_output_plugin *plugin;

plugin = (struct flbgo_output_plugin *) data;
flb_free(plugin->name);
flb_free(plugin);
return ret;
}

int proxy_go_input_register(struct flb_plugin_proxy *proxy,
Expand Down Expand Up @@ -262,8 +268,13 @@ int proxy_go_input_destroy(void *data)
flb_debug("[GO] running exit callback");

ret = plugin->cb_exit();
return ret;
}

void proxy_go_input_unregister(void *data) {
struct flbgo_input_plugin *plugin;

plugin = (struct flbgo_input_plugin *) data;
flb_free(plugin->name);
flb_free(plugin);
return ret;
}
}
4 changes: 3 additions & 1 deletion src/proxy/go/go.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ int proxy_go_output_init(struct flb_plugin_proxy *proxy);
int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx,
const void *data, size_t size,
const char *tag, int tag_len);
int proxy_go_output_destroy(void *data);
int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx, void *data);
void proxy_go_output_unregister(void *data);

int proxy_go_input_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def);
Expand All @@ -67,4 +68,5 @@ int proxy_go_input_collect(struct flb_plugin_proxy *ctx,
int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
void *allocated_data);
int proxy_go_input_destroy(void *data);
void proxy_go_input_unregister(void *data);
#endif

0 comments on commit ec7b332

Please sign in to comment.