Skip to content

Commit

Permalink
filter_lua: add support for Log metadata handling
Browse files Browse the repository at this point in the history
The current Lua filter only supports the processing of the log body and timestamp per
record. Metadata support in logs was added recently and this patch extends the filter
with a new function prototype and return values to provide metadata manipulation
capabilities.

The new option called 'enable_metadata', boolean (default: off) allows to use a new
prototype for the Lua script which in a new argument receives the metadata as a Lua
table, similar concept as the log body is received. The following is an example of
the use of this new functionality:

  pipeline:
    inputs:
      - name: dummy
        processors:
          logs:
            - name: lua
              enable_metadata: true
              call: test_v2
              code: |
                function test_v2(tag, timestamp, metadata, body)
                  metadata['meta_test'] = 'ok'
                  body['body_test'] = 'ok'
                  return 2, timestamp, metadata, body
                end
    outputs:
      - name : stdout
        match: '*'

For this type of function, is mandatory to return the metadata table, either a new
one or an updated version.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Jan 11, 2025
1 parent 412d3ea commit 7f74d7b
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 47 deletions.
166 changes: 119 additions & 47 deletions plugins/filter_lua/lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,11 @@ static int pack_record(struct lua_filter *ctx,
}

if (ret == FLB_EVENT_ENCODER_SUCCESS && metadata != NULL) {
ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
log_encoder, metadata);
ret = flb_log_event_encoder_set_metadata_from_msgpack_object(log_encoder, metadata);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
log_encoder, body);
ret = flb_log_event_encoder_set_body_from_msgpack_object(log_encoder, body);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
Expand All @@ -444,67 +442,90 @@ static int pack_record(struct lua_filter *ctx,
return ret;
}

static int pack_result (struct lua_filter *ctx, struct flb_time *ts,
msgpack_object *metadata,
struct flb_log_event_encoder *log_encoder,
char *data, size_t bytes)
static int pack_result(struct lua_filter *ctx, struct flb_time *ts,
//msgpack_object *metadata,
struct flb_log_event *log_event,
struct flb_log_event_encoder *log_encoder,
char *meta_buf, size_t meta_size,
char *body_buf, size_t body_size)
{
int ret;
size_t index = 0;
size_t off = 0;
msgpack_object *entry;
msgpack_unpacked result;

msgpack_unpacked_init(&result);

ret = msgpack_unpack_next(&result, data, bytes, &off);
msgpack_unpacked result_meta;
msgpack_unpacked result_body;

/* unpack metadata if set */
if (meta_buf) {
msgpack_unpacked_init(&result_meta);
ret = msgpack_unpack_next(&result_meta, meta_buf, meta_size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result_meta);
return FLB_FALSE;
}
}

/* Pack record */
msgpack_unpacked_init(&result_body);
off = 0;
ret = msgpack_unpack_next(&result_body, body_buf, body_size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result);

msgpack_unpacked_destroy(&result_body);
if (meta_buf) {
msgpack_unpacked_destroy(&result_meta);
}
return FLB_FALSE;
}

if (result.data.type == MSGPACK_OBJECT_MAP) {
ret = pack_record(ctx, log_encoder,
ts, metadata, &result.data);
if (result_body.data.type == MSGPACK_OBJECT_MAP) {
if (meta_buf) {
ret = pack_record(ctx, log_encoder, ts, &result_meta.data, &result_body.data);
}
else {
ret = pack_record(ctx, log_encoder, ts, NULL, &result_body.data);
}
msgpack_unpacked_destroy(&result_body);

msgpack_unpacked_destroy(&result);
if (meta_buf) {
msgpack_unpacked_destroy(&result_meta);
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
return FLB_FALSE;
}

return FLB_TRUE;
}
else if (result.data.type == MSGPACK_OBJECT_ARRAY) {
for (index = 0 ; index < result.data.via.array.size ; index++) {
entry = &result.data.via.array.ptr[index];
else if (result_body.data.type == MSGPACK_OBJECT_ARRAY) {
for (index = 0 ; index < result_body.data.via.array.size ; index++) {
entry = &result_body.data.via.array.ptr[index];

if (entry->type == MSGPACK_OBJECT_MAP) {
ret = pack_record(ctx, log_encoder,
ts, metadata, entry);
if (meta_buf) {
ret = pack_record(ctx, log_encoder, ts, &result_meta.data, entry);
}
else {
ret = pack_record(ctx, log_encoder, ts, NULL, entry);
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);

msgpack_unpacked_destroy(&result_body);
return FLB_FALSE;
}
}
else {
msgpack_unpacked_destroy(&result);
msgpack_unpacked_destroy(&result_body);

return FLB_FALSE;
}
}

msgpack_unpacked_destroy(&result);

msgpack_unpacked_destroy(&result_body);
return FLB_TRUE;
}

msgpack_unpacked_destroy(&result);

msgpack_unpacked_destroy(&result_body);
return FLB_FALSE;
}

Expand All @@ -524,8 +545,10 @@ static int cb_lua_filter(const void *data, size_t bytes,
/* Lua return values */
int l_code;
double l_timestamp;
msgpack_packer data_pck;
msgpack_sbuffer data_sbuf;
msgpack_packer body_pck = {0};
msgpack_sbuffer body_sbuf = {0};
msgpack_packer meta_pck = {0};
msgpack_sbuffer meta_sbuf = {0};
struct flb_log_event_encoder log_encoder;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
Expand Down Expand Up @@ -555,11 +578,9 @@ static int cb_lua_filter(const void *data, size_t bytes,
return FLB_FILTER_NOTOUCH;
}

while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_sbuffer_init(&data_sbuf);
msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write);
while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_sbuffer_init(&body_sbuf);
msgpack_packer_init(&body_pck, &body_sbuf, msgpack_sbuffer_write);

/* Get timestamp */
flb_time_copy(&t, &log_event.timestamp);
Expand All @@ -578,32 +599,66 @@ static int cb_lua_filter(const void *data, size_t bytes,
lua_pushnumber(ctx->lua->state, ts);
}

/* Metadata: on v1, logs metadata is not set, however in v2 it is */
if (ctx->enable_metadata) {
flb_lua_pushmsgpack(ctx->lua->state, log_event.metadata);
}

flb_lua_pushmsgpack(ctx->lua->state, log_event.body);
if (ctx->protected_mode) {
ret = lua_pcall(ctx->lua->state, 3, 3, 0);
if (!ctx->enable_metadata) {
ret = lua_pcall(ctx->lua->state, 3, 3, 0);
}
else {
ret = lua_pcall(ctx->lua->state, 4, 4, 0);
}
if (ret != 0) {
flb_plg_error(ctx->ins, "error code %d: %s",
ret, lua_tostring(ctx->lua->state, -1));
lua_pop(ctx->lua->state, 1);

msgpack_sbuffer_destroy(&data_sbuf);
msgpack_sbuffer_destroy(&body_sbuf);
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);

return FLB_FILTER_NOTOUCH;
}
}
else {
lua_call(ctx->lua->state, 3, 3);
if (!ctx->enable_metadata) {
lua_call(ctx->lua->state, 3, 3);
}
else {
lua_call(ctx->lua->state, 4, 4);
}
}

/* Initialize Return values */
l_code = 0;
l_timestamp = ts;

flb_lua_tomsgpack(ctx->lua->state, &data_pck, 0, &ctx->l2cc);
/* log body */
flb_lua_tomsgpack(ctx->lua->state, &body_pck, 0, &ctx->l2cc);
lua_pop(ctx->lua->state, 1);

if (ctx->enable_metadata) {
/* initialize msgpack buffer for metadata */
msgpack_sbuffer_init(&meta_sbuf);
msgpack_packer_init(&meta_pck, &meta_sbuf, msgpack_sbuffer_write);

/* Check for metadata (third return value) */
if (lua_istable(ctx->lua->state, -1)) {
/* Metadata is present */
flb_lua_tomsgpack(ctx->lua->state, &meta_pck, 0, &ctx->l2cc);
lua_pop(ctx->lua->state, 1);
}
else {
/* Metadata not modified */
lua_pop(ctx->lua->state, 1);
msgpack_sbuffer_destroy(&meta_sbuf);
}
}

/* Lua table */
if (ctx->time_as_table == FLB_TRUE) {
if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) {
Expand All @@ -627,11 +682,15 @@ static int cb_lua_filter(const void *data, size_t bytes,
lua_pop(ctx->lua->state, 1);
}

/* return value from Lua */
l_code = (int) lua_tointeger(ctx->lua->state, -1);
lua_pop(ctx->lua->state, 1);

if (l_code == -1) { /* Skip record */
msgpack_sbuffer_destroy(&data_sbuf);
if (ctx->enable_metadata) {
msgpack_sbuffer_destroy(&meta_sbuf);
}
msgpack_sbuffer_destroy(&body_sbuf);
continue;
}
else if (l_code == 1 || l_code == 2) { /* Modified, pack new data */
Expand All @@ -645,13 +704,21 @@ static int cb_lua_filter(const void *data, size_t bytes,
t = t_orig;
}

ret = pack_result(ctx, &t, log_event.metadata, &log_encoder,
data_sbuf.data, data_sbuf.size);
if (ctx->enable_metadata) {
ret = pack_result(ctx, &t, &log_event, &log_encoder,
meta_sbuf.data, meta_sbuf.size,
body_sbuf.data, body_sbuf.size);
}
else {
ret = pack_result(ctx, &t, &log_event, &log_encoder,
NULL, 0,
body_sbuf.data, body_sbuf.size);
}

if (ret == FLB_FALSE) {
flb_plg_error(ctx->ins, "invalid table returned at %s(), %s",
ctx->call, ctx->script);
msgpack_sbuffer_destroy(&data_sbuf);
msgpack_sbuffer_destroy(&body_sbuf);

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
Expand All @@ -678,7 +745,7 @@ static int cb_lua_filter(const void *data, size_t bytes,
}
}

msgpack_sbuffer_destroy(&data_sbuf);
msgpack_sbuffer_destroy(&body_sbuf);
}

if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA) {
Expand Down Expand Up @@ -765,6 +832,11 @@ static struct flb_config_map config_map[] = {
"It is useful to prevent removing key/value "
"since nil is a special value to remove key value from map in Lua."
},
{
FLB_CONFIG_MAP_BOOL, "enable_metadata", "false",
0, FLB_TRUE, offsetof(struct lua_filter, enable_metadata),
"If enabled, Fluent-bit will pass the metadata as a Lua table to the Lua script."
},

{0}
};
Expand Down
3 changes: 3 additions & 0 deletions plugins/filter_lua/lua_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
#include <fluent-bit/flb_lua.h>

#define LUA_BUFFER_CHUNK 1024 * 8 /* 8K should be enough to get started */
#define LUA_API_V1 1
#define LUA_API_V2 2

struct lua_filter {
flb_sds_t code; /* lua script source code */
flb_sds_t script; /* lua script path */
flb_sds_t call; /* function name */
flb_sds_t buffer; /* json dec buffer */
int enable_metadata; /* enable metadata */
int protected_mode; /* exec lua function in protected mode */
int time_as_table; /* timestamp as a Lua table */
int enable_flb_null; /* Use flb_null in Lua */
Expand Down

0 comments on commit 7f74d7b

Please sign in to comment.