From a269dd156f39cc79f0879e29c57dc3965f6568fd Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 17 Feb 2022 16:56:57 +0900 Subject: [PATCH] go: proxy: plugin: Implement multi-threaded Golang input plugin mechanism Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_api.h | 8 + include/fluent-bit/flb_input.h | 13 ++ include/fluent-bit/flb_output.h | 4 + include/fluent-bit/flb_plugin_proxy.h | 13 +- src/flb_api.c | 13 ++ src/flb_input.c | 40 ++++- src/flb_input_thread.c | 4 + src/flb_output.c | 21 ++- src/flb_plugin_proxy.c | 201 ++++++++++++++++++++++++-- src/proxy/go/go.c | 118 ++++++++++++++- src/proxy/go/go.h | 36 ++++- 11 files changed, 439 insertions(+), 32 deletions(-) diff --git a/include/fluent-bit/flb_api.h b/include/fluent-bit/flb_api.h index 346490dc44a..e1fce20dbc8 100644 --- a/include/fluent-bit/flb_api.h +++ b/include/fluent-bit/flb_api.h @@ -25,6 +25,14 @@ struct flb_api { const char *(*output_get_property) (const char *, struct flb_output_instance *); + const char *(*input_get_property) (const char *, struct flb_input_instance *); + + void *(*output_get_cmt_instance) (struct flb_output_instance *); + void *(*input_get_cmt_instance) (struct flb_input_instance *); + + void (*log_print) (int, const char*, int, const char*, ...); + int (*input_log_check) (struct flb_input_instance *, int); + int (*output_log_check) (struct flb_output_instance *, int); }; #ifdef FLB_CORE diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index b0327d5819f..4ff1862fe99 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -51,6 +51,8 @@ /* Input plugin flag masks */ #define FLB_INPUT_NET 4 /* input address may set host and port */ +#define FLB_INPUT_PLUGIN_CORE 0 +#define FLB_INPUT_PLUGIN_PROXY 1 #define FLB_INPUT_CORO 128 /* plugin requires a thread on callbacks */ #define FLB_INPUT_PRIVATE 256 /* plugin is not published/exposed */ #define FLB_INPUT_NOTAG 512 /* plugin might don't have tags */ @@ -67,6 +69,13 @@ struct flb_input_instance; struct flb_input_plugin { + /* + * The type defines if this is a core-based plugin or it's handled by + * some specific proxy. + */ + int type; + void *proxy; + int flags; /* plugin flags */ int event_type; /* event type to be generated: logs ?, metrics ? */ @@ -522,6 +531,9 @@ int flb_input_set_property(struct flb_input_instance *ins, const char *k, const char *v); const char *flb_input_get_property(const char *key, struct flb_input_instance *ins); +#ifdef FLB_HAVE_METRICS +void *flb_input_get_cmt_instance(struct flb_input_instance *ins); +#endif int flb_input_check(struct flb_config *config); void flb_input_set_context(struct flb_input_instance *ins, void *context); @@ -581,6 +593,7 @@ void flb_input_net_default_listener(const char *listen, int port, int flb_input_event_type_is_metric(struct flb_input_instance *ins); int flb_input_event_type_is_log(struct flb_input_instance *ins); +int flb_input_log_check(struct flb_input_instance *ins, int l); struct mk_event_loop *flb_input_event_loop_get(struct flb_input_instance *ins); int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *ins); diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 9a892d84c79..845d2a220c0 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -728,6 +728,9 @@ const char *flb_output_name(struct flb_output_instance *in); int flb_output_set_property(struct flb_output_instance *out, const char *k, const char *v); const char *flb_output_get_property(const char *key, struct flb_output_instance *ins); +#ifdef FLB_HAVE_METRICS +void *flb_output_get_cmt_instance(struct flb_output_instance *ins); +#endif void flb_output_net_default(const char *host, int port, struct flb_output_instance *ins); const char *flb_output_name(struct flb_output_instance *ins); @@ -737,6 +740,7 @@ void flb_output_set_context(struct flb_output_instance *ins, void *context); int flb_output_instance_destroy(struct flb_output_instance *ins); int flb_output_init_all(struct flb_config *config); int flb_output_check(struct flb_config *config); +int flb_output_log_check(struct flb_output_instance *ins, int l); int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins); int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins); diff --git a/include/fluent-bit/flb_plugin_proxy.h b/include/fluent-bit/flb_plugin_proxy.h index 008552c8dc7..081821417f9 100644 --- a/include/fluent-bit/flb_plugin_proxy.h +++ b/include/fluent-bit/flb_plugin_proxy.h @@ -23,6 +23,7 @@ #include #include #include +#include /* Plugin Types */ #define FLB_PROXY_INPUT_PLUGIN 1 @@ -61,12 +62,18 @@ struct flb_plugin_proxy_context { struct flb_plugin_proxy *proxy; }; +struct flb_plugin_input_proxy_context { + int coll_fd; + /* A proxy ptr is needed to store the proxy type/lang (OUTPUT/GOLANG) */ + struct flb_plugin_proxy *proxy; +}; + void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy, const char *symbol); -int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy, - struct flb_output_instance *o_ins, - struct flb_config *config); +int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy, + struct flb_output_instance *o_ins, + struct flb_config *config); int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, struct flb_config *config); diff --git a/src/flb_api.c b/src/flb_api.c index 9fcb826cd76..6c6a60ea608 100644 --- a/src/flb_api.c +++ b/src/flb_api.c @@ -20,7 +20,9 @@ #include #include #include +#include +#include #include struct flb_api *flb_api_create() @@ -34,6 +36,17 @@ struct flb_api *flb_api_create() } api->output_get_property = flb_output_get_property; + api->input_get_property = flb_input_get_property; + +#ifdef FLB_HAVE_METRICS + api->output_get_cmt_instance = flb_output_get_cmt_instance; + api->input_get_cmt_instance = flb_input_get_cmt_instance; +#endif + + api->log_print = flb_log_print; + api->input_log_check = flb_input_log_check; + api->output_log_check = flb_output_log_check; + return api; } diff --git a/src/flb_input.c b/src/flb_input.c index f30490eecb0..4763d02e4a7 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -135,6 +136,18 @@ int flb_input_event_type_is_log(struct flb_input_instance *ins) return FLB_FALSE; } +/* Check input plugin's log level. + * Not for core plugins but for Golang plugins. + * Golang plugins do not have thread-local flb_worker_ctx information. */ +int flb_input_log_check(struct flb_input_instance *ins, int l) +{ + if (ins->log_level < l) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + /* Create an input plugin instance */ struct flb_input_instance *flb_input_new(struct flb_config *config, const char *input, void *data, @@ -211,6 +224,24 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, return NULL; } + if (plugin->type == FLB_INPUT_PLUGIN_CORE) { + instance->context = NULL; + } + else { + struct flb_plugin_proxy_context *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context)); + if (!ctx) { + flb_errno(); + flb_free(instance); + return NULL; + } + + ctx->proxy = plugin->proxy; + + instance->context = ctx; + } + /* initialize remaining vars */ instance->alias = NULL; instance->id = id; @@ -219,7 +250,6 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, instance->tag = NULL; instance->tag_len = 0; instance->routable = FLB_TRUE; - instance->context = NULL; instance->data = data; instance->storage = NULL; instance->storage_type = -1; @@ -513,6 +543,13 @@ const char *flb_input_get_property(const char *key, return flb_config_prop_get(key, &ins->properties); } +#ifdef FLB_HAVE_METRICS +void *flb_input_get_cmt_instance(struct flb_input_instance *ins) +{ + return (void *)ins->cmt; +} +#endif + /* Return an instance name or alias */ const char *flb_input_name(struct flb_input_instance *ins) { @@ -884,6 +921,7 @@ void flb_input_instance_exit(struct flb_input_instance *ins, p = ins->p; if (p->cb_exit && ins->context) { + /* Multi-threaded input plugins use the same function signature for exit callbacks. */ p->cb_exit(ins->context, config); } } diff --git a/src/flb_input_thread.c b/src/flb_input_thread.c index 218eba5f2e5..e0224fa0573 100644 --- a/src/flb_input_thread.c +++ b/src/flb_input_thread.c @@ -775,6 +775,10 @@ int flb_input_thread_destroy(struct flb_input_thread *it, struct flb_input_insta { int ret; flb_input_thread_exit(it, ins); + /* On Darwin, we must call pthread_cancel here to ensure worker + * thread termination. Otherwise, worker thread termination will + * be blocked. */ + pthread_cancel(it->thread); ret = pthread_join(it->thread, NULL); mpack_writer_destroy(&it->writer); pthread_mutex_destroy(&it->mutex); diff --git a/src/flb_output.c b/src/flb_output.c index 43549cd70b9..863ad4232f3 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -807,6 +807,13 @@ const char *flb_output_get_property(const char *key, struct flb_output_instance return flb_config_prop_get(key, &ins->properties); } +#ifdef FLB_HAVE_METRICS +void *flb_output_get_cmt_instance(struct flb_output_instance *ins) +{ + return (void *)ins->cmt; +} +#endif + /* Trigger the output plugins setup callbacks to prepare them. */ int flb_output_init_all(struct flb_config *config) { @@ -948,7 +955,7 @@ int flb_output_init_all(struct flb_config *config) #ifdef FLB_HAVE_PROXY_GO /* Proxy plugins have their own initialization */ if (p->type == FLB_OUTPUT_PLUGIN_PROXY) { - ret = flb_plugin_proxy_init(p->proxy, ins, config); + ret = flb_plugin_proxy_output_init(p->proxy, ins, config); if (ret == -1) { flb_output_instance_destroy(ins); return -1; @@ -1098,6 +1105,18 @@ int flb_output_check(struct flb_config *config) return 0; } +/* Check output plugin's log level. + * Not for core plugins but for Golang plugins. + * Golang plugins do not have thread-local flb_worker_ctx information. */ +int flb_output_log_check(struct flb_output_instance *ins, int l) +{ + if (ins->log_level < l) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + /* * Output plugins might have enabled certain features that have not been passed * directly to the upstream context. In order to avoid let plugins validate specific diff --git a/src/flb_plugin_proxy.c b/src/flb_plugin_proxy.c index a1cdd32490b..b18862a01a9 100644 --- a/src/flb_plugin_proxy.c +++ b/src/flb_plugin_proxy.c @@ -36,6 +36,8 @@ /* Proxies */ #include "proxy/go/go.h" +#define PROXY_CALLBACK_TIME 1 /* 1 seconds */ + static void proxy_cb_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, @@ -51,11 +53,11 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk, #ifdef FLB_HAVE_PROXY_GO if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { flb_trace("[GO] entering go_flush()"); - ret = proxy_go_flush(ctx, - event_chunk->data, - event_chunk->size, - event_chunk->tag, - flb_sds_len(event_chunk->tag)); + ret = proxy_go_output_flush(ctx, + event_chunk->data, + event_chunk->size, + event_chunk->tag, + flb_sds_len(event_chunk->tag)); } #else (void) ctx; @@ -68,19 +70,150 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(ret); } +static int flb_proxy_input_cb_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret = FLB_OK; + size_t len = 0; + void *data = NULL; + struct flb_plugin_input_proxy_context *ctx = (struct flb_plugin_input_proxy_context *) in_context; + +#ifdef FLB_HAVE_PROXY_GO + if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { + flb_trace("[GO] entering go_collect()"); + ret = proxy_go_input_collect(ctx->proxy, &data, &len); + + if (ret == -1) { + flb_errno(); + return -1; + } + + flb_input_chunk_append_raw(ins, NULL, 0, data, len); + + if (!data) { + free(data); + } + + ret = proxy_go_input_cleanup(ctx->proxy, data); + if (ret == -1) { + flb_errno(); + return -1; + } + } +#endif + + return 0; +} + +static int flb_proxy_input_cb_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + int ret = -1; + struct flb_plugin_input_proxy_context *ctx; + struct flb_plugin_proxy_context *pc; + + /* Allocate space for the configuration context */ + ctx = flb_malloc(sizeof(struct flb_plugin_input_proxy_context)); + if (!ctx) { + flb_errno(); + return -1; + } + + /* Before to initialize for proxy, set the proxy instance reference */ + pc = (struct flb_plugin_proxy_context *)(ins->context); + ctx->proxy = pc->proxy; + + /* Before to initialize, set the instance reference */ + pc->proxy->instance = ins; + + /* Based on 'proxy', use the proper handler */ + if (pc->proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + ret = proxy_go_input_init(pc->proxy); + + if (ret == -1) { + flb_error("Could not initialize proxy for threaded input plugin"); + goto init_error; + } +#else + flb_error("Could not find initializing function on proxy for threaded input plugin"); + goto init_error; +#endif + } + else { + fprintf(stderr, "[proxy] unrecognized input proxy handler %i\n", + pc->proxy->def->proxy); + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + /* Collect upon data available on timer */ + ret = flb_input_set_collector_time(ins, + flb_proxy_input_cb_collect, + PROXY_CALLBACK_TIME, 0, + config); + + if (ret == -1) { + flb_error("Could not set collector for threaded proxy input plugin"); + goto init_error; + } + ctx->coll_fd = ret; + + return ret; + +init_error: + flb_free(ctx); + + return -1; +} + +static void flb_proxy_input_cb_pause(void *data, struct flb_config *config) +{ + struct flb_plugin_input_proxy_context *ctx = data; + + flb_input_collector_pause(ctx->coll_fd, ctx->proxy->instance); +} + +static void flb_proxy_input_cb_resume(void *data, struct flb_config *config) +{ + struct flb_plugin_input_proxy_context *ctx = data; + + flb_input_collector_resume(ctx->coll_fd, ctx->proxy->instance); +} + static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy); -static int flb_proxy_cb_exit(void *data, struct flb_config *config) +static int flb_proxy_output_cb_exit(void *data, struct flb_config *config) { struct flb_output_plugin *instance = data; struct flb_plugin_proxy *proxy = (instance->proxy); if (proxy->def->proxy == FLB_PROXY_GOLANG) { - proxy_go_destroy(proxy->data); + proxy_go_output_destroy(proxy->data); } flb_plugin_proxy_destroy(proxy); return 0; } +static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config) +{ + struct flb_plugin_input_proxy_context *ctx = in_context; + + if (!in_context) { + return 0; + } + + if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { + proxy_go_input_destroy(ctx->proxy->data); + } + + flb_plugin_proxy_destroy(ctx->proxy); + + flb_free(ctx); + + return 0; +} + static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, struct flb_plugin_proxy_def *def, struct flb_config *config) @@ -107,7 +240,41 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, * we put our proxy-middle callbacks to do the translation properly. */ out->cb_flush = proxy_cb_flush; - out->cb_exit = flb_proxy_cb_exit; + out->cb_exit = flb_proxy_output_cb_exit; + return 0; +} + +static int flb_proxy_register_input(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def, + struct flb_config *config) +{ + struct flb_input_plugin *in; + + in = flb_calloc(1, sizeof(struct flb_input_plugin)); + if (!in) { + flb_errno(); + return -1; + } + + /* Plugin registration */ + in->type = FLB_INPUT_PLUGIN_PROXY; + in->proxy = proxy; + in->flags = def->flags | FLB_INPUT_THREADED; + in->name = flb_strdup(def->name); + in->description = def->description; + mk_list_add(&in->_head, &config->in_plugins); + + /* + * Set proxy callbacks: external plugins which are not following + * the core plugins specs, have a different callback approach, so + * we put our proxy-middle callbacks to do the translation properly. + */ + in->cb_init = flb_proxy_input_cb_init; + in->cb_collect = flb_proxy_input_cb_collect; + in->cb_flush_buf = NULL; + in->cb_exit = flb_proxy_input_cb_exit; + in->cb_pause = flb_proxy_input_cb_pause; + in->cb_resume = flb_proxy_input_cb_resume; return 0; } @@ -160,7 +327,12 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, ret = -1; if (def->proxy == FLB_PROXY_GOLANG) { #ifdef FLB_HAVE_PROXY_GO - ret = proxy_go_register(proxy, def); + if (def->type == FLB_PROXY_OUTPUT_PLUGIN) { + ret = proxy_go_output_register(proxy, def); + } + else if (def->type == FLB_PROXY_INPUT_PLUGIN) { + ret = proxy_go_input_register(proxy, def); + } #endif } if (ret == 0) { @@ -171,14 +343,17 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, if (def->type == FLB_PROXY_OUTPUT_PLUGIN) { flb_proxy_register_output(proxy, def, config); } + else if (def->type == FLB_PROXY_INPUT_PLUGIN) { + flb_proxy_register_input(proxy, def, config); + } } return 0; } -int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy, - struct flb_output_instance *o_ins, - struct flb_config *config) +int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy, + struct flb_output_instance *o_ins, + struct flb_config *config) { int ret = -1; @@ -188,7 +363,7 @@ int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy, /* Based on 'proxy', use the proper handler */ if (proxy->def->proxy == FLB_PROXY_GOLANG) { #ifdef FLB_HAVE_PROXY_GO - ret = proxy_go_init(proxy); + ret = proxy_go_output_init(proxy); #endif } else { diff --git a/src/proxy/go/go.c b/src/proxy/go/go.c index 0e71f59b54c..19600a0d943 100644 --- a/src/proxy/go/go.c +++ b/src/proxy/go/go.c @@ -53,8 +53,8 @@ */ /*------------------------EOF------------------------------------------------*/ -int proxy_go_register(struct flb_plugin_proxy *proxy, - struct flb_plugin_proxy_def *def) +int proxy_go_output_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def) { struct flbgo_output_plugin *plugin; @@ -94,7 +94,7 @@ int proxy_go_register(struct flb_plugin_proxy *proxy, return 0; } -int proxy_go_init(struct flb_plugin_proxy *proxy) +int proxy_go_output_init(struct flb_plugin_proxy *proxy) { int ret; struct flbgo_output_plugin *plugin = proxy->data; @@ -117,9 +117,9 @@ int proxy_go_init(struct flb_plugin_proxy *proxy) return ret; } -int proxy_go_flush(struct flb_plugin_proxy_context *ctx, - const void *data, size_t size, - const char *tag, int tag_len) +int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx, + const void *data, size_t size, + const char *tag, int tag_len) { int ret; char *buf; @@ -145,7 +145,7 @@ int proxy_go_flush(struct flb_plugin_proxy_context *ctx, return ret; } -int proxy_go_destroy(void *data) +int proxy_go_output_destroy(void *data) { int ret = 0; struct flbgo_output_plugin *plugin; @@ -163,3 +163,107 @@ int proxy_go_destroy(void *data) flb_free(plugin); return ret; } + +int proxy_go_input_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def) +{ + struct flbgo_input_plugin *plugin; + + plugin = flb_malloc(sizeof(struct flbgo_input_plugin)); + if (!plugin) { + return -1; + } + + /* + * Lookup the entry point function: + * + * - FLBPluginInit + * - FLBPluginInputCallback + * - FLBPluginExit + * + * note: registration callback FLBPluginRegister() is resolved by the + * parent proxy interface. + */ + + plugin->cb_init = flb_plugin_proxy_symbol(proxy, "FLBPluginInit"); + if (!plugin->cb_init) { + fprintf(stderr, "[go proxy]: could not load FLBPluginInit symbol\n"); + flb_free(plugin); + return -1; + } + + plugin->cb_collect = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCallback"); + plugin->cb_cleanup = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCleanupCallback"); + plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit"); + plugin->name = flb_strdup(def->name); + + /* This Go plugin context is an opaque data for the parent proxy */ + proxy->data = plugin; + + return 0; +} + +int proxy_go_input_init(struct flb_plugin_proxy *proxy) +{ + int ret; + struct flbgo_input_plugin *plugin = proxy->data; + + /* set the API */ + plugin->api = proxy->api; + plugin->i_ins = proxy->instance; + // In order to avoid having the whole instance as part of the ABI we + // copy the context pointer into the plugin. + plugin->context = ((struct flb_input_instance *)proxy->instance)->context; + + ret = plugin->cb_init(plugin); + if (ret <= 0) { + flb_error("[go proxy]: plugin '%s' failed to initialize", + plugin->name); + flb_free(plugin); + return -1; + } + + return ret; +} + +int proxy_go_input_collect(struct flb_plugin_proxy *ctx, + void **collected_data, size_t *len) +{ + int ret; + void *data = NULL; + struct flbgo_input_plugin *plugin = ctx->data; + + ret = plugin->cb_collect(&data, len); + + *collected_data = data; + + return ret; +} + +int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx, + void *allocated_data) +{ + int ret = 0; + struct flbgo_input_plugin *plugin = ctx->data; + + if (plugin->cb_cleanup) { + ret = plugin->cb_cleanup(allocated_data); + } + + return ret; +} + +int proxy_go_input_destroy(void *data) +{ + int ret = 0; + struct flbgo_input_plugin *plugin; + + plugin = (struct flbgo_input_plugin *) data; + flb_debug("[GO] running exit callback"); + + ret = plugin->cb_exit(); + + flb_free(plugin->name); + flb_free(plugin); + return ret; +} diff --git a/src/proxy/go/go.h b/src/proxy/go/go.h index 882184a7f6a..1cc0cc141e7 100644 --- a/src/proxy/go/go.h +++ b/src/proxy/go/go.h @@ -36,13 +36,35 @@ struct flbgo_output_plugin { int (*cb_exit_ctx)(void *); }; -int proxy_go_register(struct flb_plugin_proxy *proxy, - struct flb_plugin_proxy_def *def); +struct flbgo_input_plugin { + char *name; + void *api; + void *i_ins; + struct flb_plugin_proxy_context *context; + + int (*cb_init)(); + int (*cb_collect)(void **, size_t *); + int (*cb_cleanup)(void *); + int (*cb_exit)(); +}; + +int proxy_go_output_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def); + +int proxy_go_output_init(struct flb_plugin_proxy *proxy); + +int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx, + const void *data, size_t size, + const char *tag, int tag_len); +int proxy_go_output_destroy(void *data); -int proxy_go_init(struct flb_plugin_proxy *proxy); +int proxy_go_input_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def); -int proxy_go_flush(struct flb_plugin_proxy_context *ctx, - const void *data, size_t size, - const char *tag, int tag_len); -int proxy_go_destroy(void *data); +int proxy_go_input_init(struct flb_plugin_proxy *proxy); +int proxy_go_input_collect(struct flb_plugin_proxy *ctx, + void **collected_data, size_t *len); +int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx, + void *allocated_data); +int proxy_go_input_destroy(void *data); #endif