Skip to content

Commit

Permalink
go: proxy: plugin: Implement multi-threaded Golang input plugin mecha…
Browse files Browse the repository at this point in the history
…nism

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Mar 11, 2022
1 parent a031dac commit cf1b75e
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 32 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

struct flb_api {
const char *(*output_get_property) (const char *, struct flb_output_instance *);
const char *(*input_get_property) (const char *, struct flb_input_instance *);
};

#ifdef FLB_CORE
Expand Down
9 changes: 9 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

/* Input plugin flag masks */
#define FLB_INPUT_NET 4 /* input address may set host and port */
#define FLB_INPUT_PLUGIN_CORE 0
#define FLB_INPUT_PLUGIN_PROXY 1
#define FLB_INPUT_CORO 128 /* plugin requires a thread on callbacks */
#define FLB_INPUT_PRIVATE 256 /* plugin is not published/exposed */
#define FLB_INPUT_NOTAG 512 /* plugin might don't have tags */
Expand All @@ -66,6 +68,13 @@
struct flb_input_instance;

struct flb_input_plugin {
/*
* The type defines if this is a core-based plugin or it's handled by
* some specific proxy.
*/
int type;
void *proxy;

int flags; /* plugin flags */
int event_type; /* event type to be genarated: logs ?, metrics ? */

Expand Down
18 changes: 15 additions & 3 deletions include/fluent-bit/flb_plugin_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <monkey/mk_core.h>
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_input_thread.h>

/* Plugin Types */
#define FLB_PROXY_INPUT_PLUGIN 1
Expand Down Expand Up @@ -61,12 +62,23 @@ struct flb_plugin_proxy_context {
struct flb_plugin_proxy *proxy;
};

struct flb_plugin_input_proxy_thread_config {
struct flb_input_instance *ins;
struct flb_input_thread it;
/* A proxy ptr is needed to store the proxy type/lang (OUTPUT/GOLANG) */
struct flb_plugin_proxy *proxy;
};

void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy,
const char *symbol);

int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config);
int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config);

int flb_plugin_proxy_input_init(struct flb_plugin_proxy *proxy,
struct flb_input_instance *i_ins,
struct flb_config *config);

int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
struct flb_config *config);
Expand Down
1 change: 1 addition & 0 deletions src/flb_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct flb_api *flb_api_create()
}

api->output_get_property = flb_output_get_property;
api->input_get_property = flb_input_get_property;
return api;
}

Expand Down
35 changes: 34 additions & 1 deletion src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/flb_storage.h>
Expand Down Expand Up @@ -194,6 +195,24 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
return NULL;
}

if (plugin->type == FLB_INPUT_PLUGIN_CORE) {
instance->context = NULL;
}
else {
struct flb_plugin_proxy_context *ctx;

ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
if (!ctx) {
flb_errno();
flb_free(instance);
return NULL;
}

ctx->proxy = plugin->proxy;

instance->context = ctx;
}

/* initialize remaining vars */
instance->alias = NULL;
instance->id = id;
Expand All @@ -202,7 +221,6 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
instance->tag = NULL;
instance->tag_len = 0;
instance->routable = FLB_TRUE;
instance->context = NULL;
instance->data = data;
instance->storage = NULL;
instance->storage_type = -1;
Expand Down Expand Up @@ -623,6 +641,20 @@ int flb_input_instance_init(struct flb_input_instance *ins,
}
#endif


#ifdef FLB_HAVE_PROXY_GO
/* Proxy plugins have their own initialization */
if (p->type == FLB_INPUT_PLUGIN_PROXY) {
ret = flb_plugin_proxy_input_init(p->proxy, ins, config);
if (ret == -1) {
flb_input_instance_destroy(ins);
return -1;
}

return 0;
}
#endif

/*
* Before to call the initialization callback, make sure that the received
* configuration parameters are valid if the plugin is registering a config map.
Expand Down Expand Up @@ -734,6 +766,7 @@ void flb_input_instance_exit(struct flb_input_instance *ins,

p = ins->p;
if (p->cb_exit && ins->context) {
/* Multi-threaded input plugins use the same function signature for exit callbacks. */
p->cb_exit(ins->context, config);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ int flb_output_init_all(struct flb_config *config)
#ifdef FLB_HAVE_PROXY_GO
/* Proxy plugins have their own initialization */
if (p->type == FLB_OUTPUT_PLUGIN_PROXY) {
ret = flb_plugin_proxy_init(p->proxy, ins, config);
ret = flb_plugin_proxy_output_init(p->proxy, ins, config);
if (ret == -1) {
flb_output_instance_destroy(ins);
return -1;
Expand Down
Loading

0 comments on commit cf1b75e

Please sign in to comment.