Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_mongo: add MongoDB as default output plugin #8857

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ option(FLB_OUT_VIVO_EXPORTER "Enabel Vivo exporter output plugin"
option(FLB_OUT_WEBSOCKET "Enable Websocket output plugin" Yes)
option(FLB_OUT_ORACLE_LOG_ANALYTICS "Enable Oracle Cloud Infrastructure Logging analytics plugin" Yes)
option(FLB_OUT_CHRONICLE "Enable Google Chronicle output plugin" Yes)
option(FLB_OUT_MONGO "Enable MongoDB output plugin" Yes)
option(FLB_FILTER_ALTER_SIZE "Enable alter_size filter" Yes)
option(FLB_FILTER_AWS "Enable aws filter" Yes)
option(FLB_FILTER_ECS "Enable AWS ECS filter" Yes)
Expand Down Expand Up @@ -1048,6 +1049,13 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND))
FLB_OPTION(FLB_OUT_PGSQL OFF)
endif()

# MongoDB
# =======
if(FLB_OUT_MONGO)
set(ENABLE_MONGOC ON)
add_subdirectory(${FLB_PATH_LIB_MONGO} EXCLUDE_FROM_ALL)
endif()

# Arrow GLib
# ==========
find_package(PkgConfig)
Expand Down
5 changes: 5 additions & 0 deletions cmake/headers.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ include_directories(
${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_CARES}/include
${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_RING_BUFFER}/lwrb/src/include

${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MONGO}/src/libmongoc/src
${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_MONGO}/src/libmongoc/src/mongoc
${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MONGO}/src/libbson/src
${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_MONGO}/src/libbson/src

${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_CARES}
${CMAKE_CURRENT_BINARY_DIR}/${FLB_PATH_LIB_JANSSON}/include
${CMAKE_CURRENT_BINARY_DIR}/lib/cmetrics
Expand Down
1 change: 1 addition & 0 deletions cmake/libraries.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ set(FLB_PATH_LIB_SNAPPY "lib/snappy-fef67ac")
set(FLB_PATH_LIB_RDKAFKA "lib/librdkafka-2.3.0")
set(FLB_PATH_LIB_RING_BUFFER "lib/lwrb")
set(FLB_PATH_LIB_WASM_MICRO_RUNTIME "lib/wasm-micro-runtime-WAMR-1.3.0")
set(FLB_PATH_LIB_MONGO "lib/mongo-c-driver")
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ REGISTER_OUT_PLUGIN("out_prometheus_remote_write")
REGISTER_OUT_PLUGIN("out_s3")
REGISTER_OUT_PLUGIN("out_vivo_exporter")
REGISTER_OUT_PLUGIN("out_chronicle")
REGISTER_OUT_PLUGIN("out_mongo")

# FILTERS
# =======
Expand Down
1 change: 1 addition & 0 deletions plugins/out_mongo/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FLB_PLUGIN(out_mongo "mongo.c" "")
154 changes: 154 additions & 0 deletions plugins/out_mongo/mongo.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_utils.h>

#include <mongoc/mongoc.h>

#include "mongo.h"

static int
cb_mongodb_init(struct flb_output_instance *ins, struct flb_config *config, void *data) {

int ret = 0;
int io_flags = 0;
struct flb_mongodb *ctx;
struct flb_upstream *upstream;

/* Set default network configuration */
flb_output_net_default(FLB_MONGODB_HOST, FLB_MONGODB_PORT, ins);

/* Allocate plugin context */
ctx = flb_calloc(1, sizeof(struct flb_mongodb));
if (!ctx) {
flb_errno();
return -1;
}
ctx->instance = ins;

/* Register context with plugin instance */
flb_output_set_context(ins, ctx);

/*
* This plugin instance uses the HTTP client interface, let's register
* it debugging callbacks.
* NOTE: is the macro even set?
*/
flb_output_set_http_debug_callbacks(ins);

/* Load config map */
ret = flb_output_config_map_set(ins, (void *)ctx);
if (ret == -1) {
flb_free(ctx);
return -1;
}

/* Set io properties based on features. */
if (ins->use_tls == FLB_TRUE) {
io_flags = FLB_IO_TLS;
} else {
io_flags = FLB_IO_TCP;
}

if (ins->host.ipv6 == FLB_TRUE) {
io_flags |= FLB_IO_IPV6;
}

/* Prepare an upstream handler */
upstream = flb_upstream_create(config, ins->host.name, ins->host.port, io_flags, ins->tls);
if (NULL != upstream) {
ctx->upstream = upstream;
flb_output_upstream_set(ctx->upstream, ins);
} else {
flb_free(ctx);
return -1;
}

flb_time_zero(&ctx->ts_dupe);
flb_time_zero(&ctx->ts_last);

flb_plg_debug(ctx->instance, "host=%s port=%i", ins->host.name, ins->host.port);
printf("host=%s port=%i\n", ins->host.name, ins->host.port);

return 0;
}

int
mongodb_format(const char *tag, int tag_len, const void *data, size_t event_sz, size_t *out_sz,
struct flb_mongodb *ctx) {

int ret = 0;
struct flb_time time;
struct flb_log_event log_event;
struct flb_log_event_decoder log_decoder;
ret = flb_log_event_decoder_init(&log_decoder, (char *)data, event_sz);

if (ret != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->instance, "Log event decoder initialization error : %d", ret);
return 1;
}

while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) ==
FLB_EVENT_DECODER_SUCCESS) {
flb_time_copy(&time, &log_event.timestamp);
printf("time: %ld %ld\n", time.tm.tv_sec, time.tm.tv_nsec);
}

flb_log_event_decoder_destroy(&log_decoder);

return 0;
}

static void cb_mongodb_flush(
struct flb_event_chunk * event_chunk, struct flb_output_flush * out_flush,
struct flb_input_instance * i_ins, void *out_context, struct flb_config *config) {
int ret = 0;
size_t bytes;
struct flb_connection *connection;
struct flb_mongodb *output_ctx = (struct flb_mongodb *)out_context;

/* Convert format: metrics / logs */
if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
/* format metrics */
printf("metrics are not yet supported.\n");
assert(0 && "TODO");
} else {
/* format logs */
printf("log event\n");
ret = mongodb_format(event_chunk->tag, flb_sds_len(event_chunk->tag), event_chunk->data,
event_chunk->size, &bytes, output_ctx);

if (0 != ret) {
FLB_OUTPUT_RETURN(FLB_ERROR);
}
}

// NOTE: temporary
FLB_OUTPUT_RETURN(FLB_OK);

/* Get upstream connection */
connection = flb_upstream_conn_get(output_ctx->upstream);
if (!connection) {
printf("retry\n");
FLB_OUTPUT_RETURN(FLB_RETRY);
}
printf("connection acquired\n");
}

static int cb_mongodb_exit(void *data, struct flb_config *config) {
printf("Exit ran\n");
return 0;
}

struct flb_output_plugin out_mongo_plugin = {
.name = "mongo",
.description = "MongoDB",
.cb_init = cb_mongodb_init,
.cb_pre_run = NULL,
.cb_flush = cb_mongodb_flush,
.cb_exit = cb_mongodb_exit,
.config_map = NULL,
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS,
};
26 changes: 26 additions & 0 deletions plugins/out_mongo/mongo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#ifndef FLB_OUT_MONGO_H
#define FLB_OUT_MONGO_H

#include <fluent-bit/flb_time.h>

#define FLB_MONGODB_HOST "127.0.0.1"
#define FLB_MONGODB_PORT 27017

struct flb_mongodb {

char uri[1024];

struct flb_upstream* upstream;

flb_sds_t http_user;
flb_sds_t http_passwd;
flb_sds_t http_token;
struct mk_list *headers;

struct flb_time ts_dupe;
struct flb_time ts_last;

struct flb_output_instance *instance;
};

#endif // FLB_OUT_MONGO_H
4 changes: 4 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ if(FLB_JEMALLOC)
target_link_libraries(fluent-bit-static libjemalloc)
endif()

if(FLB_OUT_MONGO)
target_link_libraries(fluent-bit-static mongoc_static)
endif()

# Binary / Executable
if(FLB_BINARY)
find_package (Threads)
Expand Down