diff --git a/CMakeLists.txt b/CMakeLists.txt index 7eb4dfaf543..a7124848db8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,12 +45,12 @@ option(FLB_BUFFERING "Enable buffering support" No) option(FLB_POSIX_TLS "Force POSIX thread storage" No) option(FLB_WITHOUT_INOTIFY "Disable inotify support" No) option(FLB_SQLDB "Enable SQL embedded DB" No) -option(FLB_HTTP_SERVER "Enable HTTP Server" No) +option(FLB_HTTP_SERVER "Enable HTTP Server" Yes) # Metrics: Experimental Feature, disabled by default on 0.12 series # but enabled in the upcoming 0.13 release. Note that development # mode enable this feature. -option(FLB_METRICS "Enable metrics support" No) +option(FLB_METRICS "Enable metrics support" Yes) # Proxy Plugins option(FLB_PROXY_GO "Enable Go plugins support" Yes) @@ -112,6 +112,7 @@ option(FLB_FILTER_GREP "Enable grep filter" Yes) option(FLB_FILTER_STDOUT "Enable stdout filter" Yes) option(FLB_FILTER_PARSER "Enable parser filter" Yes) option(FLB_FILTER_KUBERNETES "Enable kubernetes filter" Yes) +option(FLB_FILTER_THROTTLE "Enable throttle filter" Yes) option(FLB_FILTER_RECORD_MODIFIER "Enable record_modifier filter" Yes) # Enable all features diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 2409b65ad1a..e4f09193e98 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -142,6 +142,7 @@ if (NOT CMAKE_SYSTEM_NAME MATCHES "Windows") REGISTER_FILTER_PLUGIN("filter_grep") endif() REGISTER_FILTER_PLUGIN("filter_stdout") +REGISTER_FILTER_PLUGIN("filter_throttle") if(FLB_REGEX) REGISTER_FILTER_PLUGIN("filter_kubernetes") diff --git a/plugins/filter_throttle/CMakeLists.txt b/plugins/filter_throttle/CMakeLists.txt new file mode 100644 index 00000000000..adc7b8f4c37 --- /dev/null +++ b/plugins/filter_throttle/CMakeLists.txt @@ -0,0 +1,6 @@ +set(src + window.c + throttle.c + ) + +FLB_PLUGIN(filter_throttle "${src}" "") diff --git a/plugins/filter_throttle/throttle.c b/plugins/filter_throttle/throttle.c new file mode 100644 index 00000000000..4a0ac044338 --- /dev/null +++ b/plugins/filter_throttle/throttle.c @@ -0,0 +1,208 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2017 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "throttle.h" +#include "window.h" + + +struct ticker { + struct flb_filter_throttle_ctx *ctx; + bool done; +}; + +void *time_ticker(void *args) +{ + struct ticker *t = args; + struct flb_time ftm; + long timestamp; + + while (!t->done) { + flb_time_get(&ftm); + timestamp = flb_time_to_double(&ftm); + window_add(t->ctx->hash, timestamp, 0); + + t->ctx->hash->current_timestamp = timestamp; + + flb_info("[filter_throttle] %i: limist is %f per sec in window %i sec, current rate is: %i per sec", timestamp, t->ctx->max_rate, t->ctx->window_size, t->ctx->hash->total / t->ctx->hash->size); + sleep(1); + } +} + +/* Given a msgpack record, do some filter action based on the defined rules */ +static inline int throttle_data(struct flb_filter_throttle_ctx *ctx) +{ + if ( ctx->hash->total / ctx->hash->size > ctx->max_rate) { + return THROTTLE_RET_DROP; + } + + window_add(ctx->hash, ctx->hash->current_timestamp, 1); + + flb_debug("[filter_throttle] limist is %f per sec in window %i sec, current rate is: %i per sec", ctx->max_rate, ctx->window_size, ctx->hash->total / ctx->hash->size); + + return THROTTLE_RET_KEEP; +} + +static int configure(struct flb_filter_throttle_ctx *ctx, struct flb_filter_instance *f_ins) +{ + char *str = NULL; + double val = 0; + char *endp; + + /* rate per second */ + str = flb_filter_get_property("rate", f_ins); + + if (str != NULL && (val = strtod(str, &endp)) > 1) { + ctx->max_rate = val; + } else { + ctx->max_rate = THROTTLE_DEFAULT_RATE; + } + + /* windows size */ + str = flb_filter_get_property("window", f_ins); + if (str != NULL && (val = strtoul(str, &endp, 10)) > 1) { + ctx->window_size = val; + } else { + ctx->window_size = THROTTLE_DEFAULT_WINDOW; + } + + return 0; +} + +static int cb_throttle_init(struct flb_filter_instance *f_ins, + struct flb_config *config, + void *data) +{ + int ret; + struct flb_filter_throttle_ctx *ctx; + pthread_t tid; + struct ticker *ticker_ctx; + + /* Create context */ + ctx = flb_malloc(sizeof(struct flb_filter_throttle_ctx)); + if (!ctx) { + flb_errno(); + return -1; + } + + /* parse plugin configuration */ + ret = configure(ctx, f_ins); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + ctx->hash = window_create(ctx->window_size); + + /* Set our context */ + flb_filter_set_context(f_ins, ctx); + + ticker_ctx = flb_malloc(sizeof(struct ticker)); + ticker_ctx->ctx = ctx; + ticker_ctx->done = false; + pthread_create(&tid, NULL, &time_ticker, ticker_ctx); + return 0; +} + +static int cb_throttle_filter(void *data, size_t bytes, + char *tag, int tag_len, + void **out_buf, size_t *out_size, + struct flb_filter_instance *f_ins, + void *context, + struct flb_config *config) +{ + int ret; + int old_size = 0; + int new_size = 0; + msgpack_unpacked result; + msgpack_object root; + size_t off = 0; + (void) f_ins; + (void) config; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + + /* Create temporal msgpack buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + + + /* Iterate each item array and apply rules */ + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, data, bytes, &off)) { + root = result.data; + if (root.type != MSGPACK_OBJECT_ARRAY) { + continue; + } + + old_size++; + + ret = throttle_data(context); + if (ret == THROTTLE_RET_KEEP) { + msgpack_pack_object(&tmp_pck, root); + new_size++; + } + else if (ret == THROTTLE_RET_DROP) { + /* Do nothing */ + } + } + msgpack_unpacked_destroy(&result); + + /* we keep everything ? */ + if (old_size == new_size) { + /* Destroy the buffer to avoid more overhead */ + msgpack_sbuffer_destroy(&tmp_sbuf); + return FLB_FILTER_NOTOUCH; + } + + /* link new buffers */ + *out_buf = tmp_sbuf.data; + *out_size = tmp_sbuf.size; + + return FLB_FILTER_MODIFIED; +} + +static int cb_throttle_exit(void *data, struct flb_config *config) +{ + struct flb_filter_throttle_ctx *ctx = data; + + flb_free(ctx); + return 0; +} + +struct flb_filter_plugin filter_throttle_plugin = { + .name = "throttle", + .description = "Throttle messages using sliding window algorithm", + .cb_init = cb_throttle_init, + .cb_filter = cb_throttle_filter, + .cb_exit = cb_throttle_exit, + .flags = 0 +}; diff --git a/plugins/filter_throttle/throttle.h b/plugins/filter_throttle/throttle.h new file mode 100644 index 00000000000..52a5861eb06 --- /dev/null +++ b/plugins/filter_throttle/throttle.h @@ -0,0 +1,39 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit Throttling + * ========== + * Copyright (C) 2017 AnchorFree + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_FILTER_THROTTLE_H +#define FLB_FILTER_THROTTLE_H + +/* actions */ +#define THROTTLE_RET_KEEP 0 +#define THROTTLE_RET_DROP 1 + +/* defaults */ +#define THROTTLE_DEFAULT_RATE 1 +#define THROTTLE_DEFAULT_WINDOW 5 + +struct flb_filter_throttle_ctx { + double max_rate; + unsigned int window_size; + + /* internal */ + struct throttle_window *hash; +}; + +#endif diff --git a/plugins/filter_throttle/window.c b/plugins/filter_throttle/window.c new file mode 100644 index 00000000000..70564113422 --- /dev/null +++ b/plugins/filter_throttle/window.c @@ -0,0 +1,73 @@ +#include +#include +#include +#include +#include + +#include "window.h" +#include "throttle.h" + +struct throttle_window *window_create(size_t size) { + struct throttle_window *tw; + + if (size <= 0) { + return NULL; + } + + tw = flb_malloc(sizeof(struct throttle_window)); + + if (!tw) { + return NULL; + } + + tw->size = size; + tw->total = 0; + tw->current_timestamp = 0; + tw->max_index = -1; + tw->table = flb_calloc(size, sizeof(struct throttle_pane)); + if (!tw->table) { + flb_error("Could not allocate initial window memory"); + return NULL; + } + + return tw; +} + + +int window_get(struct throttle_window *tw, long timestamp) { + int i; + for (i=0; i< tw->size; i++ ) { + if (tw->table[i].timestamp == timestamp) { + return i; + } + } + return NOT_FOUND; +} +int window_add(struct throttle_window *tw, long timestamp, int val) { + int i, index, size; + int sum = 0; + tw->current_timestamp = timestamp; + + size = tw->size; + index = window_get(tw, timestamp); + + if (index == NOT_FOUND) { + if (size - 1 == tw->max_index) { + /* window must be shifted */ + tw->max_index = -1; + } + tw->max_index += 1; + tw->table[tw->max_index].timestamp= timestamp; + tw->table[tw->max_index].counter = val; + } else { + tw->table[index].counter += val; + } + + for (i=0; i < tw->size; i++ ) { + sum += tw->table[i].counter; + flb_debug("timestamp: %i, value: %i", tw->table[i].timestamp, tw->table[i].counter); + } + tw->total = sum; + flb_debug("total: %i", tw->total); + return 0; +} diff --git a/plugins/filter_throttle/window.h b/plugins/filter_throttle/window.h new file mode 100644 index 00000000000..504d343d238 --- /dev/null +++ b/plugins/filter_throttle/window.h @@ -0,0 +1,38 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit Throttling + * ========== + * Copyright (C) 2017 AnchorFree + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#define NOT_FOUND -1 + +struct throttle_pane { + long timestamp; + long counter; +}; + +struct throttle_window { + long current_timestamp; + unsigned size; + unsigned total; + pthread_mutex_t result_mutex; + int max_index; + struct throttle_pane *table; +}; + +struct throttle_window *window_create(size_t size); +int window_add(struct throttle_window *tw, long timestamp, int val); diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 24887598ba0..4eec1bcec91 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -131,6 +131,7 @@ static char *elasticsearch_format(void *data, size_t bytes, char *es_index; char logstash_index[256]; char time_formatted[256]; + char index_formatted[256]; char es_uuid[37]; msgpack_unpacked result; msgpack_object root; @@ -189,10 +190,15 @@ static char *elasticsearch_format(void *data, size_t bytes, /* If logstash format and id generation is disabled, pre-generate index line for all records. */ if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + flb_time_get(&tms); + gmtime_r(&tms.tm.tv_sec, &tm); + s = strftime(index_formatted, sizeof(index_formatted) - 1, + ctx->index, &tm); + es_index = index_formatted; index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - ctx->index, ctx->type); + es_index, ctx->type); } while (msgpack_unpack_next(&result, data, bytes, &off)) { @@ -206,12 +212,20 @@ static char *elasticsearch_format(void *data, size_t bytes, continue; } + /* some broken clients may have time drift up to year 1970 + * this will generate corresponding index in Elasticsearch + * in order to prevent generating millions of indexes + * we can set to always use current time for index generation */ + if (ctx->current_time_index == FLB_TRUE) { + flb_time_get(&tms); + } else { + flb_time_pop_from_msgpack(&tms, &result, &obj); + } /* * Timestamp: Elasticsearch only support fractional seconds in * milliseconds unit, not nanoseconds, so we take our nsec value and * change it representation. */ - flb_time_pop_from_msgpack(&tms, &result, &obj); tms.tm.tv_nsec = (tms.tm.tv_nsec / 1000000); map = root.via.array.ptr[1]; @@ -243,7 +257,10 @@ static char *elasticsearch_format(void *data, size_t bytes, msgpack_pack_str(&tmp_pck, s); msgpack_pack_str_body(&tmp_pck, time_formatted, s); - es_index = ctx->index; + // make sure we handle index time format for index + s = strftime(index_formatted, sizeof(index_formatted) - 1, + ctx->index, &tm); + es_index = index_formatted; if (ctx->logstash_format == FLB_TRUE) { /* Compose Index header */ p = logstash_index + ctx->logstash_prefix_len; diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 45c873dbbd2..4c2a727f58d 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -50,6 +50,7 @@ struct flb_elasticsearch { /* enabled/disabled */ int logstash_format; int generate_id; + int current_time_index; /* prefix */ int logstash_prefix_len; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 64995fc1463..69db377aaa9 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -262,6 +262,14 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, ctx->generate_id = FLB_FALSE; } + /* Use current time for index generation instead of message record */ + tmp = flb_output_get_property("current_time_index", ins); + if (tmp) { + ctx->current_time_index = bool_value(tmp); + } else { + ctx->current_time_index = FLB_FALSE; + } + return ctx; }