From 2960218140632af557ff77bb25715241c55ee81a Mon Sep 17 00:00:00 2001 From: "longhui.li" Date: Mon, 15 Apr 2019 14:30:30 +0800 Subject: [PATCH] add filter_cgolib 1. add filter call go lib filter --- CMakeLists.txt | 1 + plugins/CMakeLists.txt | 1 + plugins/filter_cgolib/CMakeLists.txt | 5 + plugins/filter_cgolib/Readme.md | 111 +++++++++++ plugins/filter_cgolib/cgo.h | 51 +++++ plugins/filter_cgolib/cgolib.c | 272 +++++++++++++++++++++++++++ plugins/filter_cgolib/cgolib.h | 27 +++ 7 files changed, 468 insertions(+) create mode 100644 plugins/filter_cgolib/CMakeLists.txt create mode 100644 plugins/filter_cgolib/Readme.md create mode 100644 plugins/filter_cgolib/cgo.h create mode 100644 plugins/filter_cgolib/cgolib.c create mode 100644 plugins/filter_cgolib/cgolib.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 008b0b59d82..f68d3fdb681 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -124,6 +124,7 @@ option(FLB_FILTER_MODIFY "Enable modify filter" Yes) option(FLB_FILTER_STDOUT "Enable stdout filter" Yes) option(FLB_FILTER_PARSER "Enable parser filter" Yes) option(FLB_FILTER_KUBERNETES "Enable kubernetes filter" Yes) +option(FLB_FILTER_CGOLIB "Enable cgolib filter" Yes) option(FLB_FILTER_THROTTLE "Enable throttle filter" Yes) option(FLB_FILTER_NEST "Enable nest filter" Yes) option(FLB_FILTER_LUA "Enable Lua scripting filter" Yes) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 17f87c2609b..4cb623aa2dd 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -157,6 +157,7 @@ REGISTER_FILTER_PLUGIN("filter_throttle") if(FLB_REGEX) REGISTER_FILTER_PLUGIN("filter_kubernetes") + REGISTER_FILTER_PLUGIN("filter_cgolib") REGISTER_FILTER_PLUGIN("filter_parser") endif() diff --git a/plugins/filter_cgolib/CMakeLists.txt b/plugins/filter_cgolib/CMakeLists.txt new file mode 100644 index 00000000000..3265f9c3b39 --- /dev/null +++ b/plugins/filter_cgolib/CMakeLists.txt @@ -0,0 +1,5 @@ +set(src + cgolib.c + ) + +FLB_PLUGIN(filter_cgolib "${src}" "") diff --git a/plugins/filter_cgolib/Readme.md b/plugins/filter_cgolib/Readme.md new file mode 100644 index 00000000000..cfa03f50f2d --- /dev/null +++ b/plugins/filter_cgolib/Readme.md @@ -0,0 +1,111 @@ + + +call golib from c + +example go src: + +```go + +package main + +import "C" +import ( + "fmt" +) + +//export Golib_init +func Golib_init(name, value []string) int { + + fmt.Printf("get name %#v. ", name) + fmt.Printf("get value %#v. ", value) + + return 0 +} + +//export Golib_filter +func Golib_filter(srcName, srcValue []string) int { + // go will return the result in the slice which called in. so can't append item that max slice's cap. + // must use cgoAppend instead of append if you want append value. + // use cgoSetSlice if you want change value of slice or just append. + + src := loadCallIn(srcName, srcValue) + + src["myadd"] ="teat" + + return unLoadCallIn(src, srcName, srcValue) +} + +//export Golib_exit +func Golib_exit() int { + + fmt.Println("go exit") + + return 0 +} + +func loadCallIn(name, value[]string) map[string]string { + res := make(map[string]string) + for idx, n := range name { + res[n] = value[idx] + } + return res +} + +func unLoadCallIn(src map[string]string, name, value []string) int { + index := 0 + for k, v := range src { + name = cgoSetSlice(name, index, k) + value = cgoSetSlice(value, index, v) + index++ + } + fmt.Println("unload ok: ", name, value) + return index; +} + + +// can't append parameters that will over slice's cap. +// if over slice's cap, then go will malloc new memory, the c can't get the results. +func cgoAppend(src []string, parameters... string) []string { + if cap(src) - len(src) < len(parameters) { + fmt.Println(" cann't set parameters. slice cap is full. ", len(src) , cap(src), len(parameters)) + return src + } + src = append(src, parameters...) + return src +} + +func cgoSetSlice(src []string, index int, value string) []string { + if index > cap(src) { + fmt.Println(" cann't set parameters. index overflow. ", index, cap(src)) + return src + } + + if index >= len(src) { + src = append(src, value) + } else { + src[index] = value + } + + return src +} + +func main() {} + +``` + +build + +```shell +go build -o cgolib.so -buildmode=c-shared cgolib.go +``` + +config + +```ini +[FILTER] + Name cgolib + Match * + golib_so /usr/local/libs/cgolib.so + url http://12321 + url2 http://12321 +``` diff --git a/plugins/filter_cgolib/cgo.h b/plugins/filter_cgolib/cgo.h new file mode 100644 index 00000000000..03165120560 --- /dev/null +++ b/plugins/filter_cgolib/cgo.h @@ -0,0 +1,51 @@ +/* Created by "go tool cgo" - DO NOT EDIT. */ + +/* package command-line-arguments */ + + +#line 1 "cgo-builtin-prolog" + +#include /* for ptrdiff_t below */ + +#ifndef GO_CGO_EXPORT_PROLOGUE_H +#define GO_CGO_EXPORT_PROLOGUE_H + +typedef struct { const char *p; ptrdiff_t n; } _GoString_; + +#endif + +/* Start of boilerplate cgo prologue. */ +#line 1 "cgo-gcc-export-header-prolog" + +#ifndef GO_CGO_PROLOGUE_H +#define GO_CGO_PROLOGUE_H + +typedef signed char GoInt8; +typedef unsigned char GoUint8; +typedef short GoInt16; +typedef unsigned short GoUint16; +typedef int GoInt32; +typedef unsigned int GoUint32; +typedef long long GoInt64; +typedef unsigned long long GoUint64; +typedef GoInt64 GoInt; +typedef GoUint64 GoUint; +typedef __SIZE_TYPE__ GoUintptr; +typedef float GoFloat32; +typedef double GoFloat64; +typedef float _Complex GoComplex64; +typedef double _Complex GoComplex128; + +/* + static assertion to make sure the file is being used on architecture + at least with matching size of GoInt. +*/ +typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1]; + +typedef _GoString_ GoString; +typedef void *GoMap; +typedef void *GoChan; +typedef struct { void *t; void *v; } GoInterface; +typedef struct { void *data; GoInt len; GoInt cap; } GoSlice; + +#endif \ No newline at end of file diff --git a/plugins/filter_cgolib/cgolib.c b/plugins/filter_cgolib/cgolib.c new file mode 100644 index 00000000000..1b4510cd0f6 --- /dev/null +++ b/plugins/filter_cgolib/cgolib.c @@ -0,0 +1,272 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include "cgo.h" + +#include "cgolib.h" + +// copy from filter_parser.c +static int msgpackobj2char(msgpack_object *obj, + char **ret_char, int *ret_char_size) +{ + int ret = -1; + + if (obj->type == MSGPACK_OBJECT_STR) { + *ret_char = (char*)obj->via.str.ptr; + *ret_char_size = obj->via.str.size; + ret = 0; + } + else if (obj->type == MSGPACK_OBJECT_BIN) { + *ret_char = (char*)obj->via.bin.ptr; + *ret_char_size = obj->via.bin.size; + ret = 0; + } + + return ret; +} + +/* + * unpack msg, call filter , write result to parameter result + */ +static int callCgoLibFilter(struct cgolib_conf *ctx, msgpack_sbuffer *result, char *tag, int tag_len, void *data, size_t bytes) +{ + int index= 0, i; + char *key_str, *val_str; + int key_len, val_len; + + msgpack_object_kv *kv; + size_t off = 0; + msgpack_object *obj; + int map_num; + struct flb_time tm; + + GoSlice name_slice, value_slice; + _GoString_ name[MAX_FIELD]={0}, value[MAX_FIELD]={0}; + GoInt ret; + + msgpack_unpacked unpackMsg; + msgpack_packer tmp_pck; + + name_slice.data = name; + value_slice.data = value; + name_slice.len = 0; + name_slice.cap = MAX_FIELD; + value_slice.len = 0; + value_slice.cap = MAX_FIELD; + + msgpack_sbuffer_init(result); + msgpack_packer_init(&tmp_pck, result, msgpack_sbuffer_write); + + msgpack_unpacked_init(&unpackMsg); + while (msgpack_unpack_next(&unpackMsg, data, bytes, &off)) { + if (unpackMsg.data.type != MSGPACK_OBJECT_ARRAY) { + continue; + } + flb_time_pop_from_msgpack(&tm, &unpackMsg, &obj); + if (obj->type != MSGPACK_OBJECT_MAP) { + continue; + } + msgpack_pack_array(&tmp_pck, 2); + flb_time_append_to_msgpack(&tm, &tmp_pck, 0); + + map_num = obj->via.map.size; + + // add tag to slice + name[0].p = "tag"; + name[0].n = 3; + value[0].p = tag; + value[0].n = tag_len; + index = 1; + for (i=0;ivia.map.ptr[i]; + if ( msgpackobj2char(&kv->key, &key_str, &key_len) < 0 ) { + /* key is not string */ + continue; + } + if ( msgpackobj2char(&kv->val, &val_str, &val_len) < 0 ) { + /* val is not string */ + continue; + } + name[index].p = key_str; + name[index].n = key_len; + value[index].p= val_str; + value[index].n = val_len; + index++; + } + + name_slice.len = index; + value_slice.len = index; + // call cgo filter + ret = ctx->filter_lib_func(name_slice, value_slice); + if (ret == -1) { + flb_error("cgolib filter_lib_func fail. "); + msgpack_unpacked_destroy(&unpackMsg); + return -1; + } + + // load result + msgpack_pack_map(&tmp_pck, ret); + for (i=0;icontext = ctx; + + name_slice.data = name; + value_slice.data = value; + + para_num = 0; + mk_list_foreach(head, &f_ins->properties) { + p = mk_list_entry(head, struct flb_config_prop, _head); + if (strcasecmp(GOLIB_SO_KEY_NAME, p->key) == 0) { + strcpy(lib_so_name, p->val); + continue; + } + name[para_num].p = p->key; + name[para_num].n = strlen(p->key); + + value[para_num].p = p->val; + value[para_num].n = strlen(p->val); + + para_num++; + } + name_slice.len = para_num; + name_slice.cap = para_num; + value_slice.len = para_num; + value_slice.cap = para_num; + + ctx->handler = dlopen (lib_so_name, RTLD_LAZY); + if (!ctx->handler) { + flb_error("open lib so %s [%s] fail. ", lib_so_name, dlerror()); + return -1; + } + + ctx->init_lib_func = dlsym(ctx->handler, INIT_FUNC_NAME); + if ((error = dlerror()) != NULL) { + flb_error("get init func from lib_so fail %s. ", error); + return -1; + } + ctx->filter_lib_func = dlsym(ctx->handler, FILTER_FUNC_NAME); + if ((error = dlerror()) != NULL) { + flb_error("get filter func from lib_so fail %s. ", error); + return -1; + } + ctx->exit_lib_func = dlsym(ctx->handler, EXIT_FUNC_NAME); + if ((error = dlerror()) != NULL) { + flb_error("get exit func from lib_so fail %s. ", error); + return -1; + } + + ret = ctx->init_lib_func(name_slice, value_slice); + if (ret != 0) { + flb_error("init go lib fail %s. ", error); + return -1; + } + + return 0; +} + + +static int cb_golib_filter(void *data, size_t bytes, + char *tag, int tag_len, + void **out_buf, size_t *out_bytes, + struct flb_filter_instance *f_ins, + void *filter_context, + struct flb_config *config) +{ + int ret; + struct cgolib_conf *ctx = f_ins->context; + + msgpack_sbuffer result; + + ret = callCgoLibFilter(ctx, &result, tag, tag_len, data, bytes); + if (ret == -1) { + flb_error("get_slice_from_msgpacker fail. "); + msgpack_sbuffer_destroy(&result); + return -1; + } + + *out_buf = result.data; + *out_bytes = result.size; + + return FLB_FILTER_MODIFIED; +} + +static int cb_golib_exit(void *data, struct flb_config *config) +{ + struct cgolib_conf *ctx; + + ctx = data; + + flb_trace("get exit cb_golib_exit . "); + ctx->exit_lib_func(); + + dlclose(ctx->handler); + + return 0; +} + +struct flb_filter_plugin filter_cgolib_plugin = { + .name = "cgolib", + .description = "Filter to call cgolib", + .cb_init = cb_golib_init, + .cb_filter = cb_golib_filter, + .cb_exit = cb_golib_exit, + .flags = 0 +}; diff --git a/plugins/filter_cgolib/cgolib.h b/plugins/filter_cgolib/cgolib.h new file mode 100644 index 00000000000..0bc14216aa4 --- /dev/null +++ b/plugins/filter_cgolib/cgolib.h @@ -0,0 +1,27 @@ + +#ifndef FLB_FILTER_CGO_LIB_H +#define FLB_FILTER_CGO_LIB_H + +#include "cgo.h" + +// three func define in go libs + + +#define MAX_PARAMETERS 50 +#define MAX_FIELD 100 + +#define GOLIB_SO_KEY_NAME "golib_so" + +#define INIT_FUNC_NAME "Golib_init" +#define FILTER_FUNC_NAME "Golib_filter" +#define EXIT_FUNC_NAME "Golib_exit" + + +struct cgolib_conf { + void * handler; + GoInt (*init_lib_func)(GoSlice p0, GoSlice p1); + GoInt (*filter_lib_func)(GoSlice p0, GoSlice p1); + GoInt (*exit_lib_func)(); +}; + +#endif /* FLB_FILTER_CGO_LIB_H */ \ No newline at end of file