Skip to content

Commit

Permalink
out_mongo: add MongoDB as default output plugin
Browse files Browse the repository at this point in the history
MongoDB output plugin as a default plugin to ease the process of
setting up the connection between such databases and Fluent-bit.

I've opended an issue for the missing feature:

  #8846

Signed-off-by: Barnabas Ifkovics <[email protected]>
  • Loading branch information
w4term3loon committed May 26, 2024
1 parent 7de2c45 commit d76404b
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 0 deletions.
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ option(FLB_OUT_VIVO_EXPORTER "Enabel Vivo exporter output plugin"
option(FLB_OUT_WEBSOCKET "Enable Websocket output plugin" Yes)
option(FLB_OUT_ORACLE_LOG_ANALYTICS "Enable Oracle Cloud Infrastructure Logging analytics plugin" Yes)
option(FLB_OUT_CHRONICLE "Enable Google Chronicle output plugin" Yes)
option(FLB_OUT_MONGO "Enable MongoDB output plugin" Yes)
option(FLB_FILTER_ALTER_SIZE "Enable alter_size filter" Yes)
option(FLB_FILTER_AWS "Enable aws filter" Yes)
option(FLB_FILTER_ECS "Enable AWS ECS filter" Yes)
Expand Down Expand Up @@ -1048,6 +1049,13 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND))
FLB_OPTION(FLB_OUT_PGSQL OFF)
endif()

# MongoDB
# =======
if(FLB_OUT_MONGO)
set(ENABLE_MONGOC ON)
add_subdirectory(${FLB_PATH_LIB_MONGO} EXCLUDE_FROM_ALL)
endif()

# Arrow GLib
# ==========
find_package(PkgConfig)
Expand Down
5 changes: 5 additions & 0 deletions cmake/headers.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ include_directories(
${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_CARES}/include
${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_RING_BUFFER}/lwrb/src/include

${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MONGO}/src/libmongoc/src
${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_MONGO}/src/libmongoc/src/mongoc
${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MONGO}/src/libbson/src
${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_MONGO}/src/libbson/src

${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_CARES}
${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_JANSSON}/include
${CMAKE_CURRENT_BINARY_DIR}/lib/cmetrics
Expand Down
1 change: 1 addition & 0 deletions cmake/libraries.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ set(FLB_PATH_LIB_SNAPPY "lib/snappy-fef67ac")
set(FLB_PATH_LIB_RDKAFKA "lib/librdkafka-2.3.0")
set(FLB_PATH_LIB_RING_BUFFER "lib/lwrb")
set(FLB_PATH_LIB_WASM_MICRO_RUNTIME "lib/wasm-micro-runtime-WAMR-1.3.0")
set(FLB_PATH_LIB_MONGO "lib/mongo-c-driver")
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ REGISTER_OUT_PLUGIN("out_prometheus_remote_write")
REGISTER_OUT_PLUGIN("out_s3")
REGISTER_OUT_PLUGIN("out_vivo_exporter")
REGISTER_OUT_PLUGIN("out_chronicle")
REGISTER_OUT_PLUGIN("out_mongo")

# FILTERS
# =======
Expand Down
1 change: 1 addition & 0 deletions plugins/out_mongo/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FLB_PLUGIN(out_mongo "mongo.c" "")
154 changes: 154 additions & 0 deletions plugins/out_mongo/mongo.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_utils.h>

#include <mongoc/mongoc.h>

#include "mongo.h"

static int
cb_mongodb_init(struct flb_output_instance *ins, struct flb_config *config, void *data) {

int ret = 0;
int io_flags = 0;
struct flb_mongodb *ctx;
struct flb_upstream *upstream;

/* Set default network configuration */
flb_output_net_default(FLB_MONGODB_HOST, FLB_MONGODB_PORT, ins);

/* Allocate plugin context */
ctx = flb_calloc(1, sizeof(struct flb_mongodb));
if (!ctx) {
flb_errno();
return -1;
}
ctx->instance = ins;

/* Register context with plugin instance */
flb_output_set_context(ins, ctx);

/*
* This plugin instance uses the HTTP client interface, let's register
* it debugging callbacks.
* NOTE: is the macro even set?
*/
flb_output_set_http_debug_callbacks(ins);

/* Load config map */
ret = flb_output_config_map_set(ins, (void *)ctx);
if (ret == -1) {
flb_free(ctx);
return -1;
}

/* Set io properties based on features. */
if (ins->use_tls == FLB_TRUE) {
io_flags = FLB_IO_TLS;
} else {
io_flags = FLB_IO_TCP;
}

if (ins->host.ipv6 == FLB_TRUE) {
io_flags |= FLB_IO_IPV6;
}

/* Prepare an upstream handler */
upstream = flb_upstream_create(config, ins->host.name, ins->host.port, io_flags, ins->tls);
if (NULL != upstream) {
ctx->upstream = upstream;
flb_output_upstream_set(ctx->upstream, ins);
} else {
flb_free(ctx);
return -1;
}

flb_time_zero(&ctx->ts_dupe);
flb_time_zero(&ctx->ts_last);

flb_plg_debug(ctx->instance, "host=%s port=%i", ins->host.name, ins->host.port);
printf("host=%s port=%i\n", ins->host.name, ins->host.port);

return 0;
}

int
mongodb_format(const char *tag, int tag_len, const void *data, size_t event_sz, size_t *out_sz,
struct flb_mongodb *ctx) {

int ret = 0;
struct flb_time time;
struct flb_log_event log_event;
struct flb_log_event_decoder log_decoder;
ret = flb_log_event_decoder_init(&log_decoder, (char *)data, event_sz);

if (ret != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->instance, "Log event decoder initialization error : %d", ret);
return 1;
}

while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) ==
FLB_EVENT_DECODER_SUCCESS) {
flb_time_copy(&time, &log_event.timestamp);
printf("time: %ld %ld\n", time.tm.tv_sec, time.tm.tv_nsec);
}

flb_log_event_decoder_destroy(&log_decoder);

return 0;
}

static void cb_mongodb_flush(
struct flb_event_chunk * event_chunk, struct flb_output_flush * out_flush,
struct flb_input_instance * i_ins, void *out_context, struct flb_config *config) {
int ret = 0;
size_t bytes;
struct flb_connection *connection;
struct flb_mongodb *output_ctx = (struct flb_mongodb *)out_context;

/* Convert format: metrics / logs */
if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
/* format metrics */
printf("metrics are not yet supported.\n");
assert(0 && "TODO");
} else {
/* format logs */
printf("log event\n");
ret = mongodb_format(event_chunk->tag, flb_sds_len(event_chunk->tag), event_chunk->data,
event_chunk->size, &bytes, output_ctx);

if (0 != ret) {
FLB_OUTPUT_RETURN(FLB_ERROR);
}
}

// NOTE: temporary
FLB_OUTPUT_RETURN(FLB_OK);

/* Get upstream connection */
connection = flb_upstream_conn_get(output_ctx->upstream);
if (!connection) {
printf("retry\n");
FLB_OUTPUT_RETURN(FLB_RETRY);
}
printf("connection acquired\n");
}

static int cb_mongodb_exit(void *data, struct flb_config *config) {
printf("Exit ran\n");
return 0;
}

struct flb_output_plugin out_mongo_plugin = {
.name = "mongo",
.description = "MongoDB",
.cb_init = cb_mongodb_init,
.cb_pre_run = NULL,
.cb_flush = cb_mongodb_flush,
.cb_exit = cb_mongodb_exit,
.config_map = NULL,
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS,
};
26 changes: 26 additions & 0 deletions plugins/out_mongo/mongo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#ifndef FLB_OUT_MONGO_H
#define FLB_OUT_MONGO_H

#include <fluent-bit/flb_time.h>

#define FLB_MONGODB_HOST "127.0.0.1"
#define FLB_MONGODB_PORT 27017

struct flb_mongodb {

char uri[1024];

struct flb_upstream* upstream;

flb_sds_t http_user;
flb_sds_t http_passwd;
flb_sds_t http_token;
struct mk_list *headers;

struct flb_time ts_dupe;
struct flb_time ts_last;

struct flb_output_instance *instance;
};

#endif // FLB_OUT_MONGO_H
4 changes: 4 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ if(FLB_JEMALLOC)
target_link_libraries(fluent-bit-static libjemalloc)
endif()

if(FLB_OUT_MONGO)
target_link_libraries(fluent-bit-static mongoc_static)
endif()

# Binary / Executable
if(FLB_BINARY)
find_package (Threads)
Expand Down

0 comments on commit d76404b

Please sign in to comment.