Skip to content

Commit

Permalink
go: proxy: plugin: Implement multi-threaded Golang input plugin mecha…
Browse files Browse the repository at this point in the history
…nism

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Mar 11, 2022
1 parent a031dac commit a7a187e
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 32 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

struct flb_api {
const char *(*output_get_property) (const char *, struct flb_output_instance *);
const char *(*input_get_property) (const char *, struct flb_input_instance *);
};

#ifdef FLB_CORE
Expand Down
9 changes: 9 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

/* Input plugin flag masks */
#define FLB_INPUT_NET 4 /* input address may set host and port */
#define FLB_INPUT_PLUGIN_CORE 0
#define FLB_INPUT_PLUGIN_PROXY 1
#define FLB_INPUT_CORO 128 /* plugin requires a thread on callbacks */
#define FLB_INPUT_PRIVATE 256 /* plugin is not published/exposed */
#define FLB_INPUT_NOTAG 512 /* plugin might don't have tags */
Expand All @@ -66,6 +68,13 @@
struct flb_input_instance;

struct flb_input_plugin {
/*
* The type defines if this is a core-based plugin or it's handled by
* some specific proxy.
*/
int type;
void *proxy;

int flags; /* plugin flags */
int event_type; /* event type to be genarated: logs ?, metrics ? */

Expand Down
18 changes: 15 additions & 3 deletions include/fluent-bit/flb_plugin_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <monkey/mk_core.h>
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_input_thread.h>

/* Plugin Types */
#define FLB_PROXY_INPUT_PLUGIN 1
Expand Down Expand Up @@ -61,12 +62,23 @@ struct flb_plugin_proxy_context {
struct flb_plugin_proxy *proxy;
};

struct flb_plugin_input_proxy_thread_config {
struct flb_input_instance *ins;
struct flb_input_thread it;
/* A proxy ptr is needed to store the proxy type/lang (OUTPUT/GOLANG) */
struct flb_plugin_proxy *proxy;
};

void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy,
const char *symbol);

int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config);
int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config);

int flb_plugin_proxy_input_init(struct flb_plugin_proxy *proxy,
struct flb_input_instance *i_ins,
struct flb_config *config);

int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
struct flb_config *config);
Expand Down
1 change: 1 addition & 0 deletions src/flb_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct flb_api *flb_api_create()
}

api->output_get_property = flb_output_get_property;
api->input_get_property = flb_input_get_property;
return api;
}

Expand Down
35 changes: 34 additions & 1 deletion src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/flb_storage.h>
Expand Down Expand Up @@ -194,6 +195,24 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
return NULL;
}

if (plugin->type == FLB_INPUT_PLUGIN_CORE) {
instance->context = NULL;
}
else {
struct flb_plugin_proxy_context *ctx;

ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
if (!ctx) {
flb_errno();
flb_free(instance);
return NULL;
}

ctx->proxy = plugin->proxy;

instance->context = ctx;
}

/* initialize remaining vars */
instance->alias = NULL;
instance->id = id;
Expand All @@ -202,7 +221,6 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
instance->tag = NULL;
instance->tag_len = 0;
instance->routable = FLB_TRUE;
instance->context = NULL;
instance->data = data;
instance->storage = NULL;
instance->storage_type = -1;
Expand Down Expand Up @@ -623,6 +641,20 @@ int flb_input_instance_init(struct flb_input_instance *ins,
}
#endif


#ifdef FLB_HAVE_PROXY_GO
/* Proxy plugins have their own initialization */
if (p->type == FLB_INPUT_PLUGIN_PROXY) {
ret = flb_plugin_proxy_input_init(p->proxy, ins, config);
if (ret == -1) {
flb_input_instance_destroy(ins);
return -1;
}

return 0;
}
#endif

/*
* Before to call the initialization callback, make sure that the received
* configuration parameters are valid if the plugin is registering a config map.
Expand Down Expand Up @@ -734,6 +766,7 @@ void flb_input_instance_exit(struct flb_input_instance *ins,

p = ins->p;
if (p->cb_exit && ins->context) {
/* Multi-threaded input plugins use the same function signature for exit callbacks. */
p->cb_exit(ins->context, config);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ int flb_output_init_all(struct flb_config *config)
#ifdef FLB_HAVE_PROXY_GO
/* Proxy plugins have their own initialization */
if (p->type == FLB_OUTPUT_PLUGIN_PROXY) {
ret = flb_plugin_proxy_init(p->proxy, ins, config);
ret = flb_plugin_proxy_output_init(p->proxy, ins, config);
if (ret == -1) {
flb_output_instance_destroy(ins);
return -1;
Expand Down
181 changes: 168 additions & 13 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk,
#ifdef FLB_HAVE_PROXY_GO
if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
flb_trace("[GO] entering go_flush()");
ret = proxy_go_flush(ctx,
event_chunk->data,
event_chunk->size,
event_chunk->tag,
flb_sds_len(event_chunk->tag));
ret = proxy_go_output_flush(ctx,
event_chunk->data,
event_chunk->size,
event_chunk->tag,
flb_sds_len(event_chunk->tag));
}
#else
(void) ctx;
Expand All @@ -68,19 +68,71 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(ret);
}

static void proxy_cb_in_thread_callback(int write_fd, void *data)
{
int ret = FLB_OK;
size_t len = 0;
struct flb_input_thread *it = data;
mpack_writer_t *writer = &it->writer;
struct flb_plugin_input_proxy_thread_config *ctx;

#ifdef FLB_HAVE_PROXY_GO
ctx = container_of(it, struct flb_plugin_input_proxy_thread_config, it);

if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
while(!flb_input_thread_exited(it)) {
flb_trace("[GO] entering go_collect()");
ret = proxy_go_input_collect(ctx->proxy, &it->data, &len);

if (ret == -1) {
flb_errno();
break;
}

mpack_write_object_bytes(writer, it->data, len);
mpack_writer_flush_message(writer);
fflush(it->write_file);
}
}
#endif
}

static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy);
static int flb_proxy_cb_exit(void *data, struct flb_config *config)
static int flb_proxy_output_cb_exit(void *data, struct flb_config *config)
{
struct flb_output_plugin *instance = data;
struct flb_plugin_proxy *proxy = (instance->proxy);

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
proxy_go_destroy(proxy->data);
proxy_go_output_destroy(proxy->data);
}
flb_plugin_proxy_destroy(proxy);
return 0;
}

static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config)
{
struct flb_input_thread *it;
struct flb_plugin_input_proxy_thread_config *ctx;

if (!in_context) {
return 0;
}

it = in_context;
ctx = container_of(it, struct flb_plugin_input_proxy_thread_config, it);

if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
proxy_go_input_destroy(ctx->proxy->data);
}
flb_plugin_proxy_destroy(ctx->proxy);

flb_input_thread_destroy(it, ctx->ins);
flb_free(ctx);

return 0;
}

static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def,
struct flb_config *config)
Expand All @@ -107,7 +159,37 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
* we put our proxy-middle callbacks to do the translation properly.
*/
out->cb_flush = proxy_cb_flush;
out->cb_exit = flb_proxy_cb_exit;
out->cb_exit = flb_proxy_output_cb_exit;
return 0;
}

static int flb_proxy_register_input(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def,
struct flb_config *config)
{
struct flb_input_plugin *in;

in = flb_calloc(1, sizeof(struct flb_input_plugin));
if (!in) {
flb_errno();
return -1;
}

/* Plugin registration */
in->type = FLB_INPUT_PLUGIN_PROXY;
in->proxy = proxy;
in->flags = def->flags;
in->name = def->name;
in->description = def->description;
mk_list_add(&in->_head, &config->in_plugins);

/*
* Set proxy callbacks: external plugins which are not following
* the core plugins specs, have a different callback approach, so
* we put our proxy-middle callbacks to do the translation properly.
*/
in->cb_collect = flb_input_thread_collect;
in->cb_exit = flb_proxy_input_cb_exit;
return 0;
}

Expand Down Expand Up @@ -160,7 +242,12 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
ret = -1;
if (def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_register(proxy, def);
if (def->type == FLB_PROXY_OUTPUT_PLUGIN) {
ret = proxy_go_output_register(proxy, def);
}
else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
ret = proxy_go_input_register(proxy, def);
}
#endif
}
if (ret == 0) {
Expand All @@ -171,14 +258,17 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
if (def->type == FLB_PROXY_OUTPUT_PLUGIN) {
flb_proxy_register_output(proxy, def, config);
}
else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
flb_proxy_register_input(proxy, def, config);
}
}

return 0;
}

int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config)
int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config)
{
int ret = -1;

Expand All @@ -188,7 +278,7 @@ int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy,
/* Based on 'proxy', use the proper handler */
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_init(proxy);
ret = proxy_go_output_init(proxy);
#endif
}
else {
Expand All @@ -199,6 +289,71 @@ int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy,
return ret;
}

int flb_plugin_proxy_input_init(struct flb_plugin_proxy *proxy,
struct flb_input_instance *i_ins,
struct flb_config *config)
{
int ret = -1;
struct flb_plugin_input_proxy_thread_config *ctx;

/* Allocate space for the configuration context */
ctx = flb_malloc(sizeof(struct flb_plugin_input_proxy_thread_config));
if (!ctx) {
flb_errno();
return -1;
}

/* create worker thread */
ret = flb_input_thread_init(&ctx->it, proxy_cb_in_thread_callback, &ctx->it);
if (ret) {
flb_errno();
flb_error("Could not initialize worker thread");
goto init_error;
}

/* Set the context */
flb_input_set_context(i_ins, &ctx->it);

/* Collect upon data available on the pipe read fd */
ret = flb_input_set_collector_event(i_ins,
flb_input_thread_collect,
ctx->it.read,
config);
if (ret == -1) {
flb_error("Could not set collector for threaded proxy input plugin");
goto init_error;
}
ctx->it.coll_fd = ret;
/* Before to initialize for proxy, set the proxy instance reference */
ctx->proxy = proxy;

/* Before to initialize, set the instance reference */
proxy->instance = i_ins;

/* Based on 'proxy', use the proper handler */
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_input_init(proxy);

if (ret == -1) {
flb_error("Could not initialize proxy for threaded input plugin");
goto init_error;
}
#endif
}
else {
fprintf(stderr, "[proxy] unrecognized input proxy handler %i\n",
proxy->def->proxy);
}

return ret;

init_error:
flb_free(ctx);

return -1;
}

struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type,
struct flb_config *config)
{
Expand Down
Loading

0 comments on commit a7a187e

Please sign in to comment.