Skip to content

Commit

Permalink
Core: several improvements and cleanups.
Browse files Browse the repository at this point in the history
- Improved output plugins handling.
- Fluent bit now can read a configuration file (used by some plugins).
- Output plugins define now a flush callback.
- Config: fixed mk_config library routines (memory handlers).
- Output plugins can work without an URI
- Core config: map direct output plugin after list discovery.
- Doc: updated README.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed May 18, 2015
1 parent 040152b commit 81aa331
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 48 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ option(WITH_IN_XBEE "Enable XBee input plugin" No)
option(WITH_IN_CPU "Enable CPU input plugin" Yes)
option(WITH_IN_KMSG "Enable Kernel log input plugin" Yes)
option(WITH_OUT_FLUENTD "Enable Fluentd output plugin" Yes)
option(WITH_OUT_TD "Enable Treasure Data output plugin" No)
option(WITH_OUT_TD "Enable Treasure Data output plugin" Yes)

# Enable all features
if(WITH_ALL)
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Fluent Bit

_Fluent-Bit_ is an events collector for Embedded Linux and is part of the [Fluentd](http://fluentd.org) project ecosystem. It allows to collects information from different sources, package and dispatch them to different outputs such as [Fluentd](http://fluentd.org).
[Fluent-Bit](http://fluentbit.io) is an events collector for Embedded Linux and is part of the [Fluentd](http://fluentd.org) project ecosystem. It allows to collects information from different sources, package and dispatch them to different outputs such as [Fluentd](http://fluentd.org).

The current project builds an executable called _fluent-bit_, a shared library _libfluent-bit.so_ and a static library _libfluent-bit.a_. Please refer to the build options described below for more details.

Expand Down Expand Up @@ -50,6 +50,7 @@ Once the tool have been compiled, a binary file called _Fluent-Bit_ will be foun
| name | option | description |
|--------------------|-------------------------|---------------------------------------------------------------------------------|
| Fluentd | fluentd://host:port | flush content to a [Fluentd](http://fluentd.org) service. On the [Fluentd](http://fluentd.org) side, it requires an __in_forward__.|
| TreasureData | td | flush data collected to [TreasureData](http://treasuredata.com) service (cloud analytics platform) |

### Usage

Expand Down
24 changes: 24 additions & 0 deletions conf/td_output.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# TreasureData Output
# ===================
# This configuration file specify the information to be used
# when flushing information to TreasureData.com service. All
# key fields in the 'TD' section are mandatory.
[TD]
# API
# ===
# The TreasureData API key. To obtain this please log into your
# TreasureData Console:
#
# 1. Go to https://console.treasuredata.com/
# 2. API Keys box: copy the API key hash
API SOME_API_KEY

# Database
# ========
# Specify the name of your database, it must exists.
Database db_example

# Table
# =====
# Specify the database table name where the records will be stored
Table table_example
5 changes: 4 additions & 1 deletion include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#define FLB_CONFIG_H

#include <time.h>
#include <mk_config/mk_config.h>
#include <mk_config/mk_list.h>

#define FLB_CONFIG_FLUSH_SECS 5
Expand All @@ -32,6 +33,7 @@ struct flb_config {
int flush_fd; /* Timer FD associated to flush */
int verbose; /* Verbose mode (default OFF) */
time_t init_time; /* Time when Fluent Bit started */
struct mk_config *file;

/* Collectors */
struct mk_list collectors;
Expand All @@ -40,7 +42,8 @@ struct flb_config {
struct mk_list inputs;

/* Outputs */
struct mk_list outputs;
struct mk_list outputs; /* list of output plugins */
struct flb_output_plugin *output; /* output plugin in use */

char *tag; /* Message Tag, used by Fluentd */
};
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_error.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#ifndef FLB_ERROR_H
#define FLB_ERROR_H

#define FLB_ERR_CFG_FILE 010
#define FLB_ERR_CFG_FILE_FORMAT 011
#define FLB_ERR_CFG_FLUSH 020
#define FLB_ERR_CFG_FLUSH_CREATE 021
#define FLB_ERR_CFG_FLUSH_REGISTER 022
Expand Down
19 changes: 10 additions & 9 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
#include <fluent-bit/flb_config.h>

/* Output plugin masks */
#define FLB_OUTPUT_TCP 1
#define FLB_OUTPUT_SSL 2
#define FLB_OUTPUT_TCP 1 /* it uses TCP */
#define FLB_OUTPUT_SSL 2 /* use SSL layer */
#define FLB_OUTPUT_NOPROT 4 /* do not validate protocol info */

/* Internal macros for setup */
#define FLB_OUTPUT_FLUENT 0
Expand All @@ -33,13 +34,9 @@
#define FLB_OUTPUT_TD_HTTP 3
#define FLB_OUTPUT_TD_HTTPS 4

#define FLB_OUTPUT_FLUENT_Z (sizeof("fluentd") - 1) + 3
#define FLB_OUTPUT_HTTP_Z (sizeof("http") - 1) + 3
#define FLB_OUTPUT_HTTPS_Z (sizeof("https") - 1) + 3
#define FLB_OUTPUT_TD_HTTP_Z (sizeof("td+http") - 1) + 3
#define FLB_OUTPUT_TD_HTTPS_Z (sizeof("td+https") - 1) + 3

struct flb_output_plugin {
int active;

int flags;

/* The plugin name */
Expand All @@ -64,6 +61,9 @@ struct flb_output_plugin {
/* Pre run */
int (*cb_pre_run) (void *, struct flb_config *);

/* Flush callback */
int (*cb_flush) (void *, size_t, void *);

/* Output handler configuration */
void *out_context;

Expand All @@ -74,8 +74,9 @@ struct flb_output_plugin {
/* Default TCP port for Fluentd */
#define FLB_OUTPUT_FLUENT_PORT "12224"

int flb_output_check(struct flb_config *config, char *output);
int flb_output_set(struct flb_config *config, char *output);
void flb_output_pre_run(struct flb_config *config);
int flb_output_set_context(char *name, void *out_context, struct flb_config *config);
int flb_output_init(struct flb_config *config);

#endif
1 change: 1 addition & 0 deletions lib/mk_config/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cmake_minimum_required(VERSION 2.8)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC")

set(src
mk_string.c
Expand Down
22 changes: 11 additions & 11 deletions lib/mk_config/mk_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ struct mk_config *mk_config_create(const char *path)
/* Create new section */
section = mk_string_copy_substr(buf, 1, end);
current = mk_config_section_add(conf, section);
mk_mem_free(section);
free(section);
n_keys = 0;
continue;
}
Expand Down Expand Up @@ -221,8 +221,8 @@ struct mk_config *mk_config_create(const char *path)
mk_config_entry_add(conf, key, val);

/* Free temporal key and val */
mk_mem_free(key);
mk_mem_free(val);
free(key);
free(val);

n_keys++;
}
Expand All @@ -248,7 +248,7 @@ struct mk_config *mk_config_create(const char *path)
fflush(stdout);
*/
fclose(f);
if (indent) mk_mem_free(indent);
if (indent) free(indent);
return conf;
}

Expand All @@ -266,11 +266,11 @@ void mk_config_free(struct mk_config *conf)
mk_config_free_entries(section);

/* Free section node */
mk_mem_free(section->name);
mk_mem_free(section);
free(section->name);
free(section);
}
if (conf->file) mk_mem_free(conf->file);
if (conf) mk_mem_free(conf);
if (conf->file) free(conf->file);
if (conf) free(conf);
}

void mk_config_free_entries(struct mk_config_section *section)
Expand All @@ -283,9 +283,9 @@ void mk_config_free_entries(struct mk_config_section *section)
mk_list_del(&entry->_head);

/* Free memory assigned */
mk_mem_free(entry->key);
mk_mem_free(entry->val);
mk_mem_free(entry);
free(entry->key);
free(entry->val);
free(entry);
}
}

Expand Down
6 changes: 3 additions & 3 deletions lib/mk_config/mk_string.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ void mk_string_split_free(struct mk_list *list)
mk_list_foreach_safe(head, tmp, list) {
entry = mk_list_entry(head, struct mk_string_line, _head);
mk_list_del(&entry->_head);
mk_mem_free(entry->val);
mk_mem_free(entry);
free(entry->val);
free(entry);
}

mk_mem_free(list);
free(list);
}

char *mk_string_build(char **buffer, unsigned long *len,
Expand Down
2 changes: 1 addition & 1 deletion plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ REGISTER_IN_PLUGIN("in_xbee")
REGISTER_IN_PLUGIN("in_cpu")
REGISTER_IN_PLUGIN("in_kmsg")
REGISTER_OUT_PLUGIN("out_fluentd")
#REGISTER_OUT_PLUGIN("td")
REGISTER_OUT_PLUGIN("out_td")

# Generate the header from the template
configure_file(
Expand Down
20 changes: 19 additions & 1 deletion plugins/out_fluentd/out_fluentd.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>

#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_utils.h>
Expand Down Expand Up @@ -62,11 +63,28 @@ int cb_fluentd_pre_run(void *out_context, struct flb_config *config)
return 0;
}

int cb_fluentd_flush(void *data, size_t bytes, void *out_context)
{
int fd;
struct flb_out_fluentd_config *ctx = out_context;
(void) ctx;

fd = flb_net_tcp_connect(out_fluentd_plugin.host,
out_fluentd_plugin.port);
if (fd <= 0) {
return -1;
}

/* FIXME: plain TCP write */
return write(fd, data, bytes);
}

/* Plugin reference */
struct flb_output_plugin out_fluentd_plugin = {
.name = "fluentd",
.description = "Fluentd log collector",
.cb_init = cb_fluentd_init,
.cb_pre_run = cb_fluentd_pre_run,
.flags = FLB_OUTPUT_TCP | FLB_OUTPUT_SSL,
.cb_flush = cb_fluentd_flush,
.flags = FLB_OUTPUT_TCP,
};
1 change: 1 addition & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ struct flb_config *flb_config_init()

/* Register plugins */
flb_register_plugins(config);

return config;
}
16 changes: 12 additions & 4 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,14 @@ int flb_engine_flush(struct flb_config *config,
struct iovec *iov;
struct mk_list *head;

if (!tmp) {
out = mk_list_entry_first(&config->outputs,
struct flb_output_plugin, _head);
}
else {
out = tmp;
}

out = mk_list_entry_first(&config->outputs, struct flb_output_plugin, _head);
/*
* Lazy flush: it does a connect in blocking mode, this needs
* to be changed later and be integrated with the main loop.
Expand All @@ -136,9 +142,10 @@ int flb_engine_flush(struct flb_config *config,
goto flush_done;
}

bytes = write(fd, buf, size);
bytes = config->output->cb_flush(buf, size,
config->output->out_context);
if (bytes <= 0) {
perror("write");
flb_error("Error flushing data");
}
else {
flb_info("Flush buf %i bytes", bytes);
Expand Down Expand Up @@ -195,7 +202,7 @@ static int flb_engine_handle_event(int fd, int mask, struct flb_config *config)
/* Check if we need to flush */
if (config->flush_fd == fd) {
consume_byte(fd);
flb_engine_flush(config, NULL, NULL);
flb_engine_flush(config, NULL, config->output);
return 0;
}

Expand Down Expand Up @@ -236,6 +243,7 @@ int flb_engine_start(struct flb_config *config)
flb_input_pre_run_all(config);

/* Outputs pre-run */
flb_output_init(config);
flb_output_pre_run(config);

/* main loop */
Expand Down
31 changes: 18 additions & 13 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_macros.h>

#define protcmp(a, b) strncasecmp(a, b, strlen(a))

Expand Down Expand Up @@ -98,12 +99,6 @@ static int check_protocol(char *prot, char *output)
return 0;
}

if (output[len] != ':' ||
output[len + 1] != '/' ||
output[len + 1] != '/') {
return 0;
}

return 1;
}

Expand All @@ -115,7 +110,7 @@ void flb_output_pre_run(struct flb_config *config)

mk_list_foreach(head, &config->outputs) {
out = mk_list_entry(head, struct flb_output_plugin, _head);
if (out->cb_pre_run) {
if (out->cb_pre_run && out->active == FLB_TRUE) {
out->cb_pre_run(out->out_context, config);
}
}
Expand All @@ -125,7 +120,7 @@ void flb_output_pre_run(struct flb_config *config)
* It validate an output type given the string, it return the
* proper type and if valid, populate the global config.
*/
int flb_output_check(struct flb_config *config, char *output)
int flb_output_set(struct flb_config *config, char *output)
{
int ret = -1;
struct flb_output_plugin *plugin;
Expand All @@ -137,7 +132,14 @@ int flb_output_check(struct flb_config *config, char *output)

mk_list_foreach(head, &config->outputs) {
plugin = mk_list_entry(head, struct flb_output_plugin, _head);

if (check_protocol(plugin->name, output)) {
plugin->active = FLB_TRUE;
config->output = plugin;
if (plugin->flags & FLB_OUTPUT_NOPROT) {
return 0;
}

ret = split_address(plugin, output);
return ret;
}
Expand All @@ -149,18 +151,21 @@ int flb_output_check(struct flb_config *config, char *output)
/* Trigger the output plugins setup callbacks to prepare them. */
int flb_output_init(struct flb_config *config)
{
struct mk_list *head;
struct flb_output_plugin *out;

/* We need at least one output */
if (mk_list_is_empty(&config->outputs) != 0) {
if (mk_list_is_empty(&config->outputs) == 0) {
return -1;
}

/* Retrieve the plugin reference */
out = mk_list_entry_first(&config->outputs,
struct flb_output_plugin,
_head);

mk_list_foreach(head, &config->outputs) {
out = mk_list_entry(head, struct flb_output_plugin, _head);
if (out->active == FLB_TRUE) {
out->cb_init(config);
}
}
return 0;
}

Expand Down
Loading

0 comments on commit 81aa331

Please sign in to comment.