Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go: proxy: plugin: Implement multi-threaded Golang input plugin mechanism #5056

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@

struct flb_api {
const char *(*output_get_property) (const char *, struct flb_output_instance *);
const char *(*input_get_property) (const char *, struct flb_input_instance *);

void *(*output_get_cmt_instance) (struct flb_output_instance *);
void *(*input_get_cmt_instance) (struct flb_input_instance *);

void (*log_print) (int, const char*, int, const char*, ...);
int (*input_log_check) (struct flb_input_instance *, int);
int (*output_log_check) (struct flb_output_instance *, int);
};

#ifdef FLB_CORE
Expand Down
13 changes: 13 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

/* Input plugin flag masks */
#define FLB_INPUT_NET 4 /* input address may set host and port */
#define FLB_INPUT_PLUGIN_CORE 0
#define FLB_INPUT_PLUGIN_PROXY 1
#define FLB_INPUT_CORO 128 /* plugin requires a thread on callbacks */
#define FLB_INPUT_PRIVATE 256 /* plugin is not published/exposed */
#define FLB_INPUT_NOTAG 512 /* plugin might don't have tags */
Expand All @@ -67,6 +69,13 @@
struct flb_input_instance;

struct flb_input_plugin {
/*
* The type defines if this is a core-based plugin or it's handled by
* some specific proxy.
*/
int type;
void *proxy;

int flags; /* plugin flags */
int event_type; /* event type to be generated: logs ?, metrics ? */

Expand Down Expand Up @@ -522,6 +531,9 @@ int flb_input_set_property(struct flb_input_instance *ins,
const char *k, const char *v);
const char *flb_input_get_property(const char *key,
struct flb_input_instance *ins);
#ifdef FLB_HAVE_METRICS
void *flb_input_get_cmt_instance(struct flb_input_instance *ins);
#endif

int flb_input_check(struct flb_config *config);
void flb_input_set_context(struct flb_input_instance *ins, void *context);
Expand Down Expand Up @@ -581,6 +593,7 @@ void flb_input_net_default_listener(const char *listen, int port,

int flb_input_event_type_is_metric(struct flb_input_instance *ins);
int flb_input_event_type_is_log(struct flb_input_instance *ins);
int flb_input_log_check(struct flb_input_instance *ins, int l);

struct mk_event_loop *flb_input_event_loop_get(struct flb_input_instance *ins);
int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *ins);
Expand Down
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,9 @@ const char *flb_output_name(struct flb_output_instance *in);
int flb_output_set_property(struct flb_output_instance *out,
const char *k, const char *v);
const char *flb_output_get_property(const char *key, struct flb_output_instance *ins);
#ifdef FLB_HAVE_METRICS
void *flb_output_get_cmt_instance(struct flb_output_instance *ins);
#endif
void flb_output_net_default(const char *host, int port,
struct flb_output_instance *ins);
const char *flb_output_name(struct flb_output_instance *ins);
Expand All @@ -737,6 +740,7 @@ void flb_output_set_context(struct flb_output_instance *ins, void *context);
int flb_output_instance_destroy(struct flb_output_instance *ins);
int flb_output_init_all(struct flb_config *config);
int flb_output_check(struct flb_config *config);
int flb_output_log_check(struct flb_output_instance *ins, int l);

int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins);
int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins);
Expand Down
13 changes: 10 additions & 3 deletions include/fluent-bit/flb_plugin_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <monkey/mk_core.h>
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_input_thread.h>

/* Plugin Types */
#define FLB_PROXY_INPUT_PLUGIN 1
Expand Down Expand Up @@ -61,12 +62,18 @@ struct flb_plugin_proxy_context {
struct flb_plugin_proxy *proxy;
};

struct flb_plugin_input_proxy_context {
int coll_fd;
/* A proxy ptr is needed to store the proxy type/lang (OUTPUT/GOLANG) */
struct flb_plugin_proxy *proxy;
};

void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy,
const char *symbol);

int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config);
int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config);

int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
struct flb_config *config);
Expand Down
13 changes: 13 additions & 0 deletions src/flb_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_api.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>

#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_output.h>

struct flb_api *flb_api_create()
Expand All @@ -34,6 +36,17 @@ struct flb_api *flb_api_create()
}

api->output_get_property = flb_output_get_property;
api->input_get_property = flb_input_get_property;

#ifdef FLB_HAVE_METRICS
api->output_get_cmt_instance = flb_output_get_cmt_instance;
api->input_get_cmt_instance = flb_input_get_cmt_instance;
#endif

api->log_print = flb_log_print;
api->input_log_check = flb_input_log_check;
api->output_log_check = flb_output_log_check;

return api;
}

Expand Down
40 changes: 39 additions & 1 deletion src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fluent-bit/flb_input_thread.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/flb_storage.h>
Expand Down Expand Up @@ -135,6 +136,18 @@ int flb_input_event_type_is_log(struct flb_input_instance *ins)
return FLB_FALSE;
}

/* Check input plugin's log level.
* Not for core plugins but for Golang plugins.
* Golang plugins do not have thread-local flb_worker_ctx information. */
int flb_input_log_check(struct flb_input_instance *ins, int l)
{
if (ins->log_level < l) {
return FLB_FALSE;
}

return FLB_TRUE;
}

/* Create an input plugin instance */
struct flb_input_instance *flb_input_new(struct flb_config *config,
const char *input, void *data,
Expand Down Expand Up @@ -211,6 +224,24 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
return NULL;
}

if (plugin->type == FLB_INPUT_PLUGIN_CORE) {
instance->context = NULL;
}
else {
struct flb_plugin_proxy_context *ctx;

ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
if (!ctx) {
flb_errno();
flb_free(instance);
return NULL;
}

ctx->proxy = plugin->proxy;

instance->context = ctx;
}

/* initialize remaining vars */
instance->alias = NULL;
instance->id = id;
Expand All @@ -219,7 +250,6 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
instance->tag = NULL;
instance->tag_len = 0;
instance->routable = FLB_TRUE;
instance->context = NULL;
instance->data = data;
instance->storage = NULL;
instance->storage_type = -1;
Expand Down Expand Up @@ -513,6 +543,13 @@ const char *flb_input_get_property(const char *key,
return flb_config_prop_get(key, &ins->properties);
}

#ifdef FLB_HAVE_METRICS
void *flb_input_get_cmt_instance(struct flb_input_instance *ins)
{
return (void *)ins->cmt;
}
#endif

/* Return an instance name or alias */
const char *flb_input_name(struct flb_input_instance *ins)
{
Expand Down Expand Up @@ -884,6 +921,7 @@ void flb_input_instance_exit(struct flb_input_instance *ins,

p = ins->p;
if (p->cb_exit && ins->context) {
/* Multi-threaded input plugins use the same function signature for exit callbacks. */
p->cb_exit(ins->context, config);
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/flb_input_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,10 @@ int flb_input_thread_destroy(struct flb_input_thread *it, struct flb_input_insta
{
int ret;
flb_input_thread_exit(it, ins);
/* On Darwin, we must call pthread_cancel here to ensure worker
* thread termination. Otherwise, worker thread termination will
* be blocked. */
pthread_cancel(it->thread);
niedbalski marked this conversation as resolved.
Show resolved Hide resolved
ret = pthread_join(it->thread, NULL);
mpack_writer_destroy(&it->writer);
pthread_mutex_destroy(&it->mutex);
Expand Down
21 changes: 20 additions & 1 deletion src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,13 @@ const char *flb_output_get_property(const char *key, struct flb_output_instance
return flb_config_prop_get(key, &ins->properties);
}

#ifdef FLB_HAVE_METRICS
void *flb_output_get_cmt_instance(struct flb_output_instance *ins)
{
return (void *)ins->cmt;
}
#endif

/* Trigger the output plugins setup callbacks to prepare them. */
int flb_output_init_all(struct flb_config *config)
{
Expand Down Expand Up @@ -948,7 +955,7 @@ int flb_output_init_all(struct flb_config *config)
#ifdef FLB_HAVE_PROXY_GO
/* Proxy plugins have their own initialization */
if (p->type == FLB_OUTPUT_PLUGIN_PROXY) {
ret = flb_plugin_proxy_init(p->proxy, ins, config);
ret = flb_plugin_proxy_output_init(p->proxy, ins, config);
if (ret == -1) {
flb_output_instance_destroy(ins);
return -1;
Expand Down Expand Up @@ -1098,6 +1105,18 @@ int flb_output_check(struct flb_config *config)
return 0;
}

/* Check output plugin's log level.
* Not for core plugins but for Golang plugins.
* Golang plugins do not have thread-local flb_worker_ctx information. */
int flb_output_log_check(struct flb_output_instance *ins, int l)
{
if (ins->log_level < l) {
return FLB_FALSE;
}

return FLB_TRUE;
}

/*
* Output plugins might have enabled certain features that have not been passed
* directly to the upstream context. In order to avoid let plugins validate specific
Expand Down
Loading