Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: fluent/fluent-bit
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 438e500e689f67544c6c55ce3730a51d446b1f63
Choose a base ref
..
head repository: fluent/fluent-bit
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: b0d6e12745f66c60a693bb86d07d9e718a80bc7a
Choose a head ref
Showing with 18 additions and 15 deletions.
  1. +17 −14 plugins/filter_ratelimit/ratelimit.c
  2. +1 −1 tests/runtime/filter_ratelimit.c
31 changes: 17 additions & 14 deletions plugins/filter_ratelimit/ratelimit.c
Original file line number Diff line number Diff line change
@@ -28,9 +28,8 @@

#include "ratelimit.h"

#define RATELIMIT_BUCKET_FIELD "kubernetes.pod_name"
#define RATELIMIT_EVENTS_PER_SECOND 5
#define RATELIMIT_EVENTS_BURST 10
#define RATELIMIT_EVENTS_BURST 20
#define RATELIMIT_MAX_BUCKETS 256
#define RATELIMIT_KEEP 0
#define RATELIMIT_EXCLUDE 1
@@ -48,7 +47,7 @@ struct bucket {

struct ratelimit_ctx {
/* Config values */
char *bucket_field;
char *bucket_key_field;
int events_per_second;
int events_burst;
int initial_burst;
@@ -67,8 +66,8 @@ static void ratelimit_ctx_destroy(struct ratelimit_ctx *ctx) {
if (ctx == NULL) {
return;
}
if (ctx->bucket_field != NULL) {
flb_free(ctx->bucket_field);
if (ctx->bucket_key_field != NULL) {
flb_free(ctx->bucket_key_field);
}
if (ctx->buckets != NULL) {
flb_hash_destroy(ctx->buckets);
@@ -129,10 +128,10 @@ static int ratelimit_limit_by_bucket(struct ratelimit_ctx *ctx, char *bucket_key
bucket->dropped++;
/* Log when messages are dropped, but throttle past RATELIMIT_THROTTLE_LIMIT messages. */
if (bucket->dropped <= RATELIMIT_THROTTLE_LIMIT || bucket->dropped % RATELIMIT_THROTTLE_PERIOD == 0) {
flb_info("[filter_ratelimit] %d dropped message(s) in a row: %s=%s%s", bucket->dropped, ctx->bucket_field,
flb_info("[filter_ratelimit] %d dropped message(s) in a row: %s=%s%s", bucket->dropped, ctx->bucket_key_field,
bucket_key, bucket->dropped > RATELIMIT_THROTTLE_LIMIT ? " (throttled)" : "");
if (bucket->dropped == RATELIMIT_THROTTLE_LIMIT) {
flb_info("[filter_ratelimit] throttling further log messages: %s=%s", ctx->bucket_field, bucket_key);
flb_info("[filter_ratelimit] throttling further log messages: %s=%s", ctx->bucket_key_field, bucket_key);
}
}
return RATELIMIT_EXCLUDE;
@@ -168,7 +167,7 @@ static int ratelimit_limit(struct ratelimit_ctx *ctx, msgpack_object map) {
klen = k->via.bin.size;
}

if (strncmp(key, ctx->bucket_field, klen) == 0) {
if (strncmp(key, ctx->bucket_key_field, klen) == 0) {
break;
}

@@ -177,7 +176,7 @@ static int ratelimit_limit(struct ratelimit_ctx *ctx, msgpack_object map) {

if (!k) {
/* Bucket field doesn't exist - keep message */
flb_debug("[filter_ratelimit] bucket field %s isn't present on event", ctx->bucket_field);
flb_debug("[filter_ratelimit] bucket_key %s isn't present on event", ctx->bucket_key_field);
return RATELIMIT_KEEP;
}

@@ -191,7 +190,7 @@ static int ratelimit_limit(struct ratelimit_ctx *ctx, msgpack_object map) {
vlen = v->via.bin.size;
}
else {
flb_debug("[filter_ratelimit] bucket field %s value isn't a string", ctx->bucket_field);
flb_debug("[filter_ratelimit] bucket_key %s value isn't a string", ctx->bucket_key_field);
return RATELIMIT_KEEP;
}

@@ -217,16 +216,20 @@ static int cb_ratelimit_init(struct flb_filter_instance *f_ins,
flb_errno();
return -1;
}
ctx->bucket_field = RATELIMIT_BUCKET_FIELD;
ctx->bucket_key_field = NULL;
ctx->events_per_second = RATELIMIT_EVENTS_PER_SECOND;
ctx->events_burst = RATELIMIT_EVENTS_BURST;
ctx->max_buckets = RATELIMIT_MAX_BUCKETS;
ctx->buckets = NULL;

/* Set config values */
tmp = flb_filter_get_property("bucket_field", f_ins);
tmp = flb_filter_get_property("bucket_key", f_ins);
if (tmp) {
ctx->bucket_field = flb_strdup(tmp);
ctx->bucket_key_field = flb_strdup(tmp);
} else {
flb_error("[filter_ratelimit] bucket_key must be set");
ratelimit_ctx_destroy(ctx);
return -1;
}

tmp = flb_filter_get_property("events_per_second", f_ins);
@@ -275,7 +278,7 @@ static int cb_ratelimit_init(struct flb_filter_instance *f_ins,
ctx->max_buckets = (int)r;
}

flb_debug("[filter_ratelimit] bucket_field=%s", ctx->bucket_field);
flb_debug("[filter_ratelimit] bucket_key_field=%s", ctx->bucket_key_field);
flb_debug("[filter_ratelimit] events_per_second=%u", ctx->events_per_second);
flb_debug("[filter_ratelimit] events_burst=%u", ctx->events_burst);
flb_debug("[filter_ratelimit] initial_burst=%u", ctx->initial_burst);
2 changes: 1 addition & 1 deletion tests/runtime/filter_ratelimit.c
Original file line number Diff line number Diff line change
@@ -104,7 +104,7 @@ void flb_test_filter_ratelimit(void)
TEST_CHECK(filter_ffd >= 0);
ret = flb_filter_set(ctx, filter_ffd,
"Match", "*",
"Bucket_Field", "filename",
"Bucket_Key", "filename",
"Events_Per_Second", "1",
"Events_Burst", "2",
"Initial_Burst", "1",