diff --git a/include/fluent-bit/flb_api.h b/include/fluent-bit/flb_api.h index 346490dc44a..abaa953341d 100644 --- a/include/fluent-bit/flb_api.h +++ b/include/fluent-bit/flb_api.h @@ -25,6 +25,7 @@ struct flb_api { const char *(*output_get_property) (const char *, struct flb_output_instance *); + const char *(*input_get_property) (const char *, struct flb_input_instance *); }; #ifdef FLB_CORE diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 6a8babeae01..fed5bbe48db 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -51,6 +51,8 @@ /* Input plugin flag masks */ #define FLB_INPUT_NET 4 /* input address may set host and port */ +#define FLB_INPUT_PLUGIN_CORE 0 +#define FLB_INPUT_PLUGIN_PROXY 1 #define FLB_INPUT_CORO 128 /* plugin requires a thread on callbacks */ #define FLB_INPUT_PRIVATE 256 /* plugin is not published/exposed */ #define FLB_INPUT_NOTAG 512 /* plugin might don't have tags */ @@ -66,6 +68,13 @@ struct flb_input_instance; struct flb_input_plugin { + /* + * The type defines if this is a core-based plugin or it's handled by + * some specific proxy. + */ + int type; + void *proxy; + int flags; /* plugin flags */ int event_type; /* event type to be genarated: logs ?, metrics ? */ diff --git a/include/fluent-bit/flb_plugin_proxy.h b/include/fluent-bit/flb_plugin_proxy.h index 008552c8dc7..a850cbe31af 100644 --- a/include/fluent-bit/flb_plugin_proxy.h +++ b/include/fluent-bit/flb_plugin_proxy.h @@ -23,6 +23,7 @@ #include #include #include +#include /* Plugin Types */ #define FLB_PROXY_INPUT_PLUGIN 1 @@ -61,12 +62,23 @@ struct flb_plugin_proxy_context { struct flb_plugin_proxy *proxy; }; +struct flb_plugin_input_proxy_thread_config { + struct flb_input_instance *ins; + struct flb_input_thread it; + /* A proxy ptr is needed to store the proxy type/lang (OUTPUT/GOLANG) */ + struct flb_plugin_proxy *proxy; +}; + void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy, const char *symbol); -int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy, - struct flb_output_instance *o_ins, - struct flb_config *config); +int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy, + struct flb_output_instance *o_ins, + struct flb_config *config); + +int flb_plugin_proxy_input_init(struct flb_plugin_proxy *proxy, + struct flb_input_instance *i_ins, + struct flb_config *config); int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, struct flb_config *config); diff --git a/src/flb_api.c b/src/flb_api.c index 9fcb826cd76..dd25b91b8de 100644 --- a/src/flb_api.c +++ b/src/flb_api.c @@ -34,6 +34,7 @@ struct flb_api *flb_api_create() } api->output_get_property = flb_output_get_property; + api->input_get_property = flb_input_get_property; return api; } diff --git a/src/flb_input.c b/src/flb_input.c index 26df79d5fbe..a9a4c28e42d 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -194,6 +195,24 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, return NULL; } + if (plugin->type == FLB_INPUT_PLUGIN_CORE) { + instance->context = NULL; + } + else { + struct flb_plugin_proxy_context *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context)); + if (!ctx) { + flb_errno(); + flb_free(instance); + return NULL; + } + + ctx->proxy = plugin->proxy; + + instance->context = ctx; + } + /* initialize remaining vars */ instance->alias = NULL; instance->id = id; @@ -202,7 +221,6 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, instance->tag = NULL; instance->tag_len = 0; instance->routable = FLB_TRUE; - instance->context = NULL; instance->data = data; instance->storage = NULL; instance->storage_type = -1; @@ -623,6 +641,20 @@ int flb_input_instance_init(struct flb_input_instance *ins, } #endif + +#ifdef FLB_HAVE_PROXY_GO + /* Proxy plugins have their own initialization */ + if (p->type == FLB_INPUT_PLUGIN_PROXY) { + ret = flb_plugin_proxy_input_init(p->proxy, ins, config); + if (ret == -1) { + flb_input_instance_destroy(ins); + return -1; + } + + return 0; + } +#endif + /* * Before to call the initialization callback, make sure that the received * configuration parameters are valid if the plugin is registering a config map. @@ -734,6 +766,7 @@ void flb_input_instance_exit(struct flb_input_instance *ins, p = ins->p; if (p->cb_exit && ins->context) { + /* Multi-threaded input plugins use the same function signature for exit callbacks. */ p->cb_exit(ins->context, config); } } diff --git a/src/flb_output.c b/src/flb_output.c index 3806549d974..ac706eb6421 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -948,7 +948,7 @@ int flb_output_init_all(struct flb_config *config) #ifdef FLB_HAVE_PROXY_GO /* Proxy plugins have their own initialization */ if (p->type == FLB_OUTPUT_PLUGIN_PROXY) { - ret = flb_plugin_proxy_init(p->proxy, ins, config); + ret = flb_plugin_proxy_output_init(p->proxy, ins, config); if (ret == -1) { flb_output_instance_destroy(ins); return -1; diff --git a/src/flb_plugin_proxy.c b/src/flb_plugin_proxy.c index a1cdd32490b..b0ef8a46d8d 100644 --- a/src/flb_plugin_proxy.c +++ b/src/flb_plugin_proxy.c @@ -51,11 +51,11 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk, #ifdef FLB_HAVE_PROXY_GO if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { flb_trace("[GO] entering go_flush()"); - ret = proxy_go_flush(ctx, - event_chunk->data, - event_chunk->size, - event_chunk->tag, - flb_sds_len(event_chunk->tag)); + ret = proxy_go_output_flush(ctx, + event_chunk->data, + event_chunk->size, + event_chunk->tag, + flb_sds_len(event_chunk->tag)); } #else (void) ctx; @@ -68,19 +68,71 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(ret); } +static void proxy_cb_in_thread_callback(int write_fd, void *data) +{ + int ret = FLB_OK; + size_t len = 0; + struct flb_input_thread *it = data; + mpack_writer_t *writer = &it->writer; + struct flb_plugin_input_proxy_thread_config *ctx; + +#ifdef FLB_HAVE_PROXY_GO + ctx = container_of(it, struct flb_plugin_input_proxy_thread_config, it); + + if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { + while(!flb_input_thread_exited(it)) { + flb_trace("[GO] entering go_collect()"); + ret = proxy_go_input_collect(ctx->proxy, &it->data, &len); + + if (ret == -1) { + flb_errno(); + break; + } + + mpack_write_object_bytes(writer, it->data, len); + mpack_writer_flush_message(writer); + fflush(it->write_file); + } + } +#endif +} + static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy); -static int flb_proxy_cb_exit(void *data, struct flb_config *config) +static int flb_proxy_output_cb_exit(void *data, struct flb_config *config) { struct flb_output_plugin *instance = data; struct flb_plugin_proxy *proxy = (instance->proxy); if (proxy->def->proxy == FLB_PROXY_GOLANG) { - proxy_go_destroy(proxy->data); + proxy_go_output_destroy(proxy->data); } flb_plugin_proxy_destroy(proxy); return 0; } +static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config) +{ + struct flb_input_thread *it; + struct flb_plugin_input_proxy_thread_config *ctx; + + if (!in_context) { + return 0; + } + + it = in_context; + ctx = container_of(it, struct flb_plugin_input_proxy_thread_config, it); + + if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { + proxy_go_input_destroy(ctx->proxy->data); + } + flb_plugin_proxy_destroy(ctx->proxy); + + flb_input_thread_destroy(it, ctx->ins); + flb_free(ctx); + + return 0; +} + static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, struct flb_plugin_proxy_def *def, struct flb_config *config) @@ -107,7 +159,37 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, * we put our proxy-middle callbacks to do the translation properly. */ out->cb_flush = proxy_cb_flush; - out->cb_exit = flb_proxy_cb_exit; + out->cb_exit = flb_proxy_output_cb_exit; + return 0; +} + +static int flb_proxy_register_input(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def, + struct flb_config *config) +{ + struct flb_input_plugin *in; + + in = flb_calloc(1, sizeof(struct flb_input_plugin)); + if (!in) { + flb_errno(); + return -1; + } + + /* Plugin registration */ + in->type = FLB_INPUT_PLUGIN_PROXY; + in->proxy = proxy; + in->flags = def->flags; + in->name = def->name; + in->description = def->description; + mk_list_add(&in->_head, &config->in_plugins); + + /* + * Set proxy callbacks: external plugins which are not following + * the core plugins specs, have a different callback approach, so + * we put our proxy-middle callbacks to do the translation properly. + */ + in->cb_collect = flb_input_thread_collect; + in->cb_exit = flb_proxy_input_cb_exit; return 0; } @@ -160,7 +242,12 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, ret = -1; if (def->proxy == FLB_PROXY_GOLANG) { #ifdef FLB_HAVE_PROXY_GO - ret = proxy_go_register(proxy, def); + if (def->type == FLB_PROXY_OUTPUT_PLUGIN) { + ret = proxy_go_output_register(proxy, def); + } + else if (def->type == FLB_PROXY_INPUT_PLUGIN) { + ret = proxy_go_input_register(proxy, def); + } #endif } if (ret == 0) { @@ -171,14 +258,17 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, if (def->type == FLB_PROXY_OUTPUT_PLUGIN) { flb_proxy_register_output(proxy, def, config); } + else if (def->type == FLB_PROXY_INPUT_PLUGIN) { + flb_proxy_register_input(proxy, def, config); + } } return 0; } -int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy, - struct flb_output_instance *o_ins, - struct flb_config *config) +int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy, + struct flb_output_instance *o_ins, + struct flb_config *config) { int ret = -1; @@ -188,7 +278,7 @@ int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy, /* Based on 'proxy', use the proper handler */ if (proxy->def->proxy == FLB_PROXY_GOLANG) { #ifdef FLB_HAVE_PROXY_GO - ret = proxy_go_init(proxy); + ret = proxy_go_output_init(proxy); #endif } else { @@ -199,6 +289,71 @@ int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy, return ret; } +int flb_plugin_proxy_input_init(struct flb_plugin_proxy *proxy, + struct flb_input_instance *i_ins, + struct flb_config *config) +{ + int ret = -1; + struct flb_plugin_input_proxy_thread_config *ctx; + + /* Allocate space for the configuration context */ + ctx = flb_malloc(sizeof(struct flb_plugin_input_proxy_thread_config)); + if (!ctx) { + flb_errno(); + return -1; + } + + /* create worker thread */ + ret = flb_input_thread_init(&ctx->it, proxy_cb_in_thread_callback, &ctx->it); + if (ret) { + flb_errno(); + flb_error("Could not initialize worker thread"); + goto init_error; + } + + /* Set the context */ + flb_input_set_context(i_ins, &ctx->it); + + /* Collect upon data available on the pipe read fd */ + ret = flb_input_set_collector_event(i_ins, + flb_input_thread_collect, + ctx->it.read, + config); + if (ret == -1) { + flb_error("Could not set collector for threaded proxy input plugin"); + goto init_error; + } + ctx->it.coll_fd = ret; + /* Before to initialize for proxy, set the proxy instance reference */ + ctx->proxy = proxy; + + /* Before to initialize, set the instance reference */ + proxy->instance = i_ins; + + /* Based on 'proxy', use the proper handler */ + if (proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + ret = proxy_go_input_init(proxy); + + if (ret == -1) { + flb_error("Could not initialize proxy for threaded input plugin"); + goto init_error; + } +#endif + } + else { + fprintf(stderr, "[proxy] unrecognized input proxy handler %i\n", + proxy->def->proxy); + } + + return ret; + +init_error: + flb_free(ctx); + + return -1; +} + struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type, struct flb_config *config) { diff --git a/src/proxy/go/go.c b/src/proxy/go/go.c index 0e71f59b54c..658ba0f6338 100644 --- a/src/proxy/go/go.c +++ b/src/proxy/go/go.c @@ -53,8 +53,8 @@ */ /*------------------------EOF------------------------------------------------*/ -int proxy_go_register(struct flb_plugin_proxy *proxy, - struct flb_plugin_proxy_def *def) +int proxy_go_output_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def) { struct flbgo_output_plugin *plugin; @@ -94,7 +94,7 @@ int proxy_go_register(struct flb_plugin_proxy *proxy, return 0; } -int proxy_go_init(struct flb_plugin_proxy *proxy) +int proxy_go_output_init(struct flb_plugin_proxy *proxy) { int ret; struct flbgo_output_plugin *plugin = proxy->data; @@ -117,9 +117,9 @@ int proxy_go_init(struct flb_plugin_proxy *proxy) return ret; } -int proxy_go_flush(struct flb_plugin_proxy_context *ctx, - const void *data, size_t size, - const char *tag, int tag_len) +int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx, + const void *data, size_t size, + const char *tag, int tag_len) { int ret; char *buf; @@ -145,7 +145,7 @@ int proxy_go_flush(struct flb_plugin_proxy_context *ctx, return ret; } -int proxy_go_destroy(void *data) +int proxy_go_output_destroy(void *data) { int ret = 0; struct flbgo_output_plugin *plugin; @@ -163,3 +163,93 @@ int proxy_go_destroy(void *data) flb_free(plugin); return ret; } + +int proxy_go_input_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def) +{ + struct flbgo_input_plugin *plugin; + + plugin = flb_malloc(sizeof(struct flbgo_input_plugin)); + if (!plugin) { + return -1; + } + + /* + * Lookup the entry point function: + * + * - FLBPluginInit + * - FLBPluginInputCallback + * - FLBPluginExit + * + * note: registration callback FLBPluginRegister() is resolved by the + * parent proxy interface. + */ + + plugin->cb_init = flb_plugin_proxy_symbol(proxy, "FLBPluginInit"); + if (!plugin->cb_init) { + fprintf(stderr, "[go proxy]: could not load FLBPluginInit symbol\n"); + flb_free(plugin); + return -1; + } + + plugin->cb_collect = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCallback"); + plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit"); + plugin->name = flb_strdup(def->name); + + /* This Go plugin context is an opaque data for the parent proxy */ + proxy->data = plugin; + + return 0; +} + +int proxy_go_input_init(struct flb_plugin_proxy *proxy) +{ + int ret; + struct flbgo_input_plugin *plugin = proxy->data; + + /* set the API */ + plugin->api = proxy->api; + plugin->i_ins = proxy->instance; + // In order to avoid having the whole instance as part of the ABI we + // copy the context pointer into the plugin. + plugin->context = ((struct flb_input_instance *)proxy->instance)->context; + + ret = plugin->cb_init(plugin); + if (ret <= 0) { + flb_error("[go proxy]: plugin '%s' failed to initialize", + plugin->name); + flb_free(plugin); + return -1; + } + + return ret; +} + +int proxy_go_input_collect(struct flb_plugin_proxy *ctx, + void **collected_data, size_t *len) +{ + int ret; + void *data = NULL; + struct flbgo_input_plugin *plugin = ctx->data; + + ret = plugin->cb_collect(&data, len); + + *collected_data = data; + + return ret; +} + +int proxy_go_input_destroy(void *data) +{ + int ret = 0; + struct flbgo_input_plugin *plugin; + + plugin = (struct flbgo_input_plugin *) data; + flb_debug("[GO] running exit callback"); + + ret = plugin->cb_exit(); + + flb_free(plugin->name); + flb_free(plugin); + return ret; +} diff --git a/src/proxy/go/go.h b/src/proxy/go/go.h index 882184a7f6a..d3f02942671 100644 --- a/src/proxy/go/go.h +++ b/src/proxy/go/go.h @@ -36,13 +36,32 @@ struct flbgo_output_plugin { int (*cb_exit_ctx)(void *); }; -int proxy_go_register(struct flb_plugin_proxy *proxy, - struct flb_plugin_proxy_def *def); +struct flbgo_input_plugin { + char *name; + void *api; + void *i_ins; + struct flb_plugin_proxy_context *context; + + int (*cb_init)(); + int (*cb_collect)(void **, size_t *); + int (*cb_exit)(); +}; + +int proxy_go_output_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def); + +int proxy_go_output_init(struct flb_plugin_proxy *proxy); + +int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx, + const void *data, size_t size, + const char *tag, int tag_len); +int proxy_go_output_destroy(void *data); -int proxy_go_init(struct flb_plugin_proxy *proxy); +int proxy_go_input_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def); -int proxy_go_flush(struct flb_plugin_proxy_context *ctx, - const void *data, size_t size, - const char *tag, int tag_len); -int proxy_go_destroy(void *data); +int proxy_go_input_init(struct flb_plugin_proxy *proxy); +int proxy_go_input_collect(struct flb_plugin_proxy *ctx, + void **collected_data, size_t *len); +int proxy_go_input_destroy(void *data); #endif