diff --git a/CMakeLists.txt b/CMakeLists.txt index 4960b533aa8..29517c9a21b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -258,6 +258,7 @@ option(FLB_OUT_VIVO_EXPORTER "Enabel Vivo exporter output plugin" option(FLB_OUT_WEBSOCKET "Enable Websocket output plugin" Yes) option(FLB_OUT_ORACLE_LOG_ANALYTICS "Enable Oracle Cloud Infrastructure Logging analytics plugin" Yes) option(FLB_OUT_CHRONICLE "Enable Google Chronicle output plugin" Yes) +option(FLB_OUT_MONGO "Enable MongoDB output plugin" Yes) option(FLB_FILTER_ALTER_SIZE "Enable alter_size filter" Yes) option(FLB_FILTER_AWS "Enable aws filter" Yes) option(FLB_FILTER_ECS "Enable AWS ECS filter" Yes) @@ -1048,6 +1049,13 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND)) FLB_OPTION(FLB_OUT_PGSQL OFF) endif() +# MongoDB +# ======= +if(FLB_OUT_MONGO) + set(ENABLE_MONGOC ON) + add_subdirectory(${FLB_PATH_LIB_MONGO} EXCLUDE_FROM_ALL) +endif() + # Arrow GLib # ========== find_package(PkgConfig) diff --git a/cmake/headers.cmake b/cmake/headers.cmake index 45a1394ca7f..612510da2a1 100755 --- a/cmake/headers.cmake +++ b/cmake/headers.cmake @@ -39,6 +39,11 @@ include_directories( ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_CARES}/include ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_RING_BUFFER}/lwrb/src/include + ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MONGO}/src/libmongoc/src + ${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_MONGO}/src/libmongoc/src/mongoc + ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MONGO}/src/libbson/src + ${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_MONGO}/src/libbson/src + ${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_CARES} ${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_JANSSON}/include ${CMAKE_CURRENT_BINARY_DIR}/lib/cmetrics diff --git a/cmake/libraries.cmake b/cmake/libraries.cmake index 47325cbda01..d8ef2a52fba 100644 --- a/cmake/libraries.cmake +++ b/cmake/libraries.cmake @@ -24,3 +24,4 @@ set(FLB_PATH_LIB_SNAPPY "lib/snappy-fef67ac") set(FLB_PATH_LIB_RDKAFKA "lib/librdkafka-2.3.0") set(FLB_PATH_LIB_RING_BUFFER "lib/lwrb") set(FLB_PATH_LIB_WASM_MICRO_RUNTIME "lib/wasm-micro-runtime-WAMR-1.3.0") +set(FLB_PATH_LIB_MONGO "lib/mongo-c-driver") diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 9006ef6d823..f3d7b13be84 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -341,6 +341,7 @@ REGISTER_OUT_PLUGIN("out_prometheus_remote_write") REGISTER_OUT_PLUGIN("out_s3") REGISTER_OUT_PLUGIN("out_vivo_exporter") REGISTER_OUT_PLUGIN("out_chronicle") +REGISTER_OUT_PLUGIN("out_mongo") # FILTERS # ======= diff --git a/plugins/out_mongo/CMakeLists.txt b/plugins/out_mongo/CMakeLists.txt new file mode 100644 index 00000000000..2ce402396d9 --- /dev/null +++ b/plugins/out_mongo/CMakeLists.txt @@ -0,0 +1 @@ +FLB_PLUGIN(out_mongo "mongo.c" "") diff --git a/plugins/out_mongo/mongo.c b/plugins/out_mongo/mongo.c new file mode 100644 index 00000000000..b15b3cbb4a3 --- /dev/null +++ b/plugins/out_mongo/mongo.c @@ -0,0 +1,154 @@ +#include +#include +#include +#include +#include + +#include + +#include "mongo.h" + +static int +cb_mongodb_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { + + int ret = 0; + int io_flags = 0; + struct flb_mongodb *ctx; + struct flb_upstream *upstream; + + /* Set default network configuration */ + flb_output_net_default(FLB_MONGODB_HOST, FLB_MONGODB_PORT, ins); + + /* Allocate plugin context */ + ctx = flb_calloc(1, sizeof(struct flb_mongodb)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->instance = ins; + + /* Register context with plugin instance */ + flb_output_set_context(ins, ctx); + + /* + * This plugin instance uses the HTTP client interface, let's register + * it debugging callbacks. + * NOTE: is the macro even set? + */ + flb_output_set_http_debug_callbacks(ins); + + /* Load config map */ + ret = flb_output_config_map_set(ins, (void *)ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + /* Set io properties based on features. */ + if (ins->use_tls == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } else { + io_flags = FLB_IO_TCP; + } + + if (ins->host.ipv6 == FLB_TRUE) { + io_flags |= FLB_IO_IPV6; + } + + /* Prepare an upstream handler */ + upstream = flb_upstream_create(config, ins->host.name, ins->host.port, io_flags, ins->tls); + if (NULL != upstream) { + ctx->upstream = upstream; + flb_output_upstream_set(ctx->upstream, ins); + } else { + flb_free(ctx); + return -1; + } + + flb_time_zero(&ctx->ts_dupe); + flb_time_zero(&ctx->ts_last); + + flb_plg_debug(ctx->instance, "host=%s port=%i", ins->host.name, ins->host.port); + printf("host=%s port=%i\n", ins->host.name, ins->host.port); + + return 0; +} + +int +mongodb_format(const char *tag, int tag_len, const void *data, size_t event_sz, size_t *out_sz, + struct flb_mongodb *ctx) { + + int ret = 0; + struct flb_time time; + struct flb_log_event log_event; + struct flb_log_event_decoder log_decoder; + ret = flb_log_event_decoder_init(&log_decoder, (char *)data, event_sz); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->instance, "Log event decoder initialization error : %d", ret); + return 1; + } + + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == + FLB_EVENT_DECODER_SUCCESS) { + flb_time_copy(&time, &log_event.timestamp); + printf("time: %ld %ld\n", time.tm.tv_sec, time.tm.tv_nsec); + } + + flb_log_event_decoder_destroy(&log_decoder); + + return 0; + } + + static void cb_mongodb_flush( + struct flb_event_chunk * event_chunk, struct flb_output_flush * out_flush, + struct flb_input_instance * i_ins, void *out_context, struct flb_config *config) { + int ret = 0; + size_t bytes; + struct flb_connection *connection; + struct flb_mongodb *output_ctx = (struct flb_mongodb *)out_context; + + /* Convert format: metrics / logs */ + if (event_chunk->type == FLB_EVENT_TYPE_METRICS) { + /* format metrics */ + printf("metrics are not yet supported.\n"); + assert(0 && "TODO"); + } else { + /* format logs */ + printf("log event\n"); + ret = mongodb_format(event_chunk->tag, flb_sds_len(event_chunk->tag), event_chunk->data, + event_chunk->size, &bytes, output_ctx); + + if (0 != ret) { + FLB_OUTPUT_RETURN(FLB_ERROR); + } + } + + // NOTE: temporary + FLB_OUTPUT_RETURN(FLB_OK); + + /* Get upstream connection */ + connection = flb_upstream_conn_get(output_ctx->upstream); + if (!connection) { + printf("retry\n"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + printf("connection acquired\n"); + } + + static int cb_mongodb_exit(void *data, struct flb_config *config) { + printf("Exit ran\n"); + return 0; + } + + struct flb_output_plugin out_mongo_plugin = { + .name = "mongo", + .description = "MongoDB", + .cb_init = cb_mongodb_init, + .cb_pre_run = NULL, + .cb_flush = cb_mongodb_flush, + .cb_exit = cb_mongodb_exit, + .config_map = NULL, + .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS, + }; diff --git a/plugins/out_mongo/mongo.h b/plugins/out_mongo/mongo.h new file mode 100644 index 00000000000..202af487156 --- /dev/null +++ b/plugins/out_mongo/mongo.h @@ -0,0 +1,26 @@ +#ifndef FLB_OUT_MONGO_H +#define FLB_OUT_MONGO_H + +#include + +#define FLB_MONGODB_HOST "127.0.0.1" +#define FLB_MONGODB_PORT 27017 + +struct flb_mongodb { + + char uri[1024]; + + struct flb_upstream* upstream; + + flb_sds_t http_user; + flb_sds_t http_passwd; + flb_sds_t http_token; + struct mk_list *headers; + + struct flb_time ts_dupe; + struct flb_time ts_last; + + struct flb_output_instance *instance; +}; + +#endif // FLB_OUT_MONGO_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4f64613ecf0..025a30e4d78 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -474,6 +474,10 @@ if(FLB_JEMALLOC) target_link_libraries(fluent-bit-static libjemalloc) endif() +if(FLB_OUT_MONGO) + target_link_libraries(fluent-bit-static mongoc_static) +endif() + # Binary / Executable if(FLB_BINARY) find_package (Threads)