From 901e43f7b3a09266d7be0a5847169da148c7c684 Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Wed, 6 Nov 2019 16:02:57 +0900 Subject: [PATCH] in_statsd: implement the support for "statsd" protocol This is the first cut at adding statsd support to Fluent Bit. You can use the "in_statsd" plugins as follows: $ fluent-bit -i statsd -o stdout ... now you can input metrics like: $ echo "click:10|c|@0.1" > /dev/udp/127.0.0.1/8125 $ echo "active:+10|g" > /dev/udp/127.0.0.1/8125 This plugin will parse the incoming messages and produce well- formatted records like below: {"type"=>"counter", "bucket"=>"click", "value"=>10.000000, "sample_rate"=>0.100000} {"type"=>"gauge", "bucket"=>"active", "value"=>10.000000, "incremental"=>1} With this, we can easily collect performance logs from services with statsd ouput support (like cadvisor). Signed-off-by: Fujimoto Seiji --- CMakeLists.txt | 1 + cmake/windows-setup.cmake | 1 + plugins/CMakeLists.txt | 1 + plugins/in_statsd/CMakeLists.txt | 4 + plugins/in_statsd/statsd.c | 335 +++++++++++++++++++++++++++++++ 5 files changed, 342 insertions(+) create mode 100644 plugins/in_statsd/CMakeLists.txt create mode 100644 plugins/in_statsd/statsd.c diff --git a/CMakeLists.txt b/CMakeLists.txt index f871e7bf774..3887eaa5b43 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -122,6 +122,7 @@ option(FLB_IN_DUMMY "Enable Dummy input plugin" Yes) option(FLB_IN_NETIF "Enable NetworkIF input plugin" Yes) option(FLB_IN_WINLOG "Enable Windows Log input plugin" No) option(FLB_IN_COLLECTD "Enable Collectd input plugin" Yes) +option(FLB_IN_STATSD "Enable StatsD input plugin" Yes) option(FLB_IN_STORAGE_BACKLOG "Enable storage backlog input plugin" Yes) option(FLB_OUT_AZURE "Enable Azure output plugin" Yes) option(FLB_OUT_BIGQUERY "Enable BigQuery output plugin" Yes) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index a021950ff9a..8a753612d17 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -38,6 +38,7 @@ set(FLB_IN_DUMMY Yes) set(FLB_IN_NETIF No) set(FLB_IN_WINLOG Yes) set(FLB_IN_COLLECTD No) +set(FLB_IN_STATSD No) set(FLB_IN_STORAGE_BACKLOG No) # OUTPUT plugins diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index e5be1189a2d..a491de56eb8 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -137,6 +137,7 @@ REGISTER_IN_PLUGIN("in_head") REGISTER_IN_PLUGIN("in_health") REGISTER_IN_PLUGIN("in_http") REGISTER_IN_PLUGIN("in_collectd") +REGISTER_IN_PLUGIN("in_statsd") REGISTER_IN_PLUGIN("in_storage_backlog") if (FLB_STREAM_PROCESSOR) diff --git a/plugins/in_statsd/CMakeLists.txt b/plugins/in_statsd/CMakeLists.txt new file mode 100644 index 00000000000..5b9dde2303e --- /dev/null +++ b/plugins/in_statsd/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + statsd.c) + +FLB_PLUGIN(in_statsd "${src}" "") diff --git a/plugins/in_statsd/statsd.c b/plugins/in_statsd/statsd.c new file mode 100644 index 00000000000..c586015e25f --- /dev/null +++ b/plugins/in_statsd/statsd.c @@ -0,0 +1,335 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#define MAX_PACKET_SIZE 65536 +#define DEFAULT_LISTEN "0.0.0.0" +#define DEFAULT_PORT 8125 + +#define STATSD_TYPE_COUNTER 1 +#define STATSD_TYPE_GAUGE 2 +#define STATSD_TYPE_TIMER 3 +#define STATSD_TYPE_SET 4 + +struct flb_statsd { + char *buf; /* buffer */ + char listen[256]; /* listening address (RFC-2181) */ + char port[6]; /* listening port (RFC-793) */ + flb_sockfd_t server_fd; /* server socket */ + flb_pipefd_t coll_fd; /* server handler */ + struct flb_input_instance *i_ins; /* input instance */ +}; + +/* + * The "statsd_message" represents a single line in UDP packet. + * It's just a bunch of pointers to ephemeral buffer. + */ +struct statsd_message { + char *bucket; + int bucket_len; + char *value; + int value_len; + int type; + double sample_rate; +}; + +static void pack_string(msgpack_packer *mp_pck, char *str, size_t len) +{ + if (len < 0) { + len = strlen(str); + } + msgpack_pack_str(mp_pck, len); + msgpack_pack_str_body(mp_pck, str, len); +} + +static int get_statsd_type(char *str) +{ + switch (*str) { + case 'g': + return STATSD_TYPE_GAUGE; + case 's': + return STATSD_TYPE_SET; + case 'c': + return STATSD_TYPE_COUNTER; + case 'm': + if (*(str + 1) == 's') { + return STATSD_TYPE_TIMER; + } + } + return STATSD_TYPE_COUNTER; +} + +static int is_incremental(char *str) +{ + return (*str == '+' || *str == '-'); +} + +static int statsd_process_message(msgpack_packer *mp_pck, + struct statsd_message *m) +{ + msgpack_pack_array(mp_pck, 2); + flb_pack_time_now(mp_pck); + + switch (m->type) { + case STATSD_TYPE_COUNTER: + msgpack_pack_map(mp_pck, 4); + pack_string(mp_pck, "type", 4); + pack_string(mp_pck, "counter", 7); + pack_string(mp_pck, "bucket", 6); + pack_string(mp_pck, m->bucket, m->bucket_len); + pack_string(mp_pck, "value", 5); + msgpack_pack_double(mp_pck, atof(m->value)); + pack_string(mp_pck, "sample_rate", 11); + msgpack_pack_double(mp_pck, m->sample_rate); + break; + case STATSD_TYPE_GAUGE: + msgpack_pack_map(mp_pck, 4); + pack_string(mp_pck, "type", 4); + pack_string(mp_pck, "gauge", 5); + pack_string(mp_pck, "bucket", 6); + pack_string(mp_pck, m->bucket, m->bucket_len); + pack_string(mp_pck, "value", 5); + msgpack_pack_double(mp_pck, atof(m->value)); + pack_string(mp_pck, "incremental", 11); + msgpack_pack_int(mp_pck, is_incremental(m->value)); + break; + case STATSD_TYPE_TIMER: + msgpack_pack_map(mp_pck, 4); + pack_string(mp_pck, "type", 4); + pack_string(mp_pck, "timer", 5); + pack_string(mp_pck, "bucket", 6); + pack_string(mp_pck, m->bucket, m->bucket_len); + pack_string(mp_pck, "value", 5); + msgpack_pack_double(mp_pck, atof(m->value)); + pack_string(mp_pck, "sample_rate", 11); + msgpack_pack_double(mp_pck, m->sample_rate); + break; + case STATSD_TYPE_SET: + msgpack_pack_map(mp_pck, 3); + pack_string(mp_pck, "type", 4); + pack_string(mp_pck, "set", 3); + pack_string(mp_pck, "bucket", 6); + pack_string(mp_pck, m->bucket, m->bucket_len); + pack_string(mp_pck, "value", 5); + pack_string(mp_pck, m->value, m->value_len); + break; + } + return 0; +} + +static int statsd_process_line(msgpack_packer *mp_pck, char *line) +{ + char *colon, *bar, *atmark; + struct statsd_message m; + + /* + * bucket:value|type|@sample_rate + * ------ + */ + colon = strchr(line, ':'); + if (colon == NULL) { + flb_error("[in_statsd] no bucket name found"); + return -1; + } + m.bucket = line; + m.bucket_len = (colon - line); + + /* + * bucket:value|type|@sample_rate + * ---- + */ + bar = strchr(colon + 1, '|'); + if (bar == NULL) { + flb_error("[in_statsd] no metric type found"); + return -1; + } + m.type = get_statsd_type(bar + 1); + + /* + * bucket:value|type|@sample_rate + * ----- + */ + m.value = colon + 1; + m.value_len = (bar - colon - 1); + + /* + * bucket:value|type|@sample_rate + * ------------ + */ + atmark = strstr(bar + 1, "|@"); + if (atmark == NULL || atof(atmark + 2) == 0) { + m.sample_rate = 1.0; + } + else { + m.sample_rate = atof(atmark + 2); + } + + return statsd_process_message(mp_pck, &m); +} + + +static int cb_statsd_receive(struct flb_input_instance *i_ins, + struct flb_config *config, void *data) +{ + struct flb_statsd *ctx = data; + char *line; + int len; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + + /* Receive a UDP datagram */ + len = recv(ctx->server_fd, ctx->buf, MAX_PACKET_SIZE - 1, 0); + if (len < 0) { + flb_errno(); + return -1; + } + ctx->buf[len] = '\0'; + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Process all messages in buffer */ + line = strtok(ctx->buf, "\n"); + while (line) { + flb_trace("[in_statsd] received a line: '%s'", line); + if (statsd_process_line(&mp_pck, line) < 0) { + flb_error("[in_statsd] failed to process line: '%s'", line); + } + line = strtok(NULL, "\n"); + } + + /* Send to output */ + if (mp_sbuf.size > 0) { + flb_input_chunk_append_raw(i_ins, NULL, 0, mp_sbuf.data, mp_sbuf.size); + } + msgpack_sbuffer_destroy(&mp_sbuf); + + return 0; +} + +static int cb_statsd_init(struct flb_input_instance *i_ins, + struct flb_config *config, void *data) +{ + struct flb_statsd *ctx; + char *listen; + int port; + + ctx = flb_calloc(1, sizeof(struct flb_statsd)); + if (!ctx) { + flb_errno(); + return -1; + } + + ctx->buf = flb_malloc(MAX_PACKET_SIZE); + if (!ctx->buf) { + flb_errno(); + flb_free(ctx); + return -1; + } + ctx->i_ins = i_ins; + + /* Listening address */ + if (i_ins->host.listen) { + listen = i_ins->host.listen; + } + else { + listen = DEFAULT_LISTEN; + } + strncpy(ctx->listen, listen, sizeof(ctx->listen) - 1); + + /* Listening port */ + if (i_ins->host.port) { + port = i_ins->host.port; + } + else { + port = DEFAULT_PORT; + } + snprintf(ctx->port, sizeof(ctx->port), "%hu", port); + + /* Export plugin context */ + flb_input_set_context(i_ins, ctx); + + /* Accepts metrics from UDP connections. */ + ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen); + if (ctx->server_fd == -1) { + flb_error("[in_statsd] can't bind to %s:%s", ctx->listen, ctx->port); + flb_free(ctx->buf); + flb_free(ctx); + return -1; + } + + /* Set up the UDP connection callback */ + ctx->coll_fd = flb_input_set_collector_socket(i_ins, cb_statsd_receive, + ctx->server_fd, config); + if (ctx->coll_fd == -1) { + flb_error("[in_statsd] cannot set up connection callback "); + flb_socket_close(ctx->server_fd); + flb_free(ctx->buf); + flb_free(ctx); + return -1; + } + + flb_info("[in_statsd] start UDP server on %s:%s", ctx->listen, ctx->port); + + return 0; +} + +static void cb_statsd_pause(void *data, struct flb_config *config) +{ + struct flb_statsd *ctx = data; + flb_input_collector_pause(ctx->coll_fd, ctx->i_ins); +} + +static void cb_statsd_resume(void *data, struct flb_config *config) +{ + struct flb_statsd *ctx = data; + flb_input_collector_resume(ctx->coll_fd, ctx->i_ins); +} + +static int cb_statsd_exit(void *data, struct flb_config *config) +{ + struct flb_statsd *ctx = data; + + flb_input_collector_pause(ctx->coll_fd, ctx->i_ins); + flb_socket_close(ctx->server_fd); + flb_free(ctx->buf); + flb_free(ctx); + return 0; +} + +/* Plugin reference */ +struct flb_input_plugin in_statsd_plugin = { + .name = "statsd", + .description = "StatsD input plugin", + .cb_init = cb_statsd_init, + .cb_pre_run = NULL, + .cb_collect = NULL, + .cb_ingest = NULL, + .cb_flush_buf = NULL, + .cb_pause = cb_statsd_pause, + .cb_resume = cb_statsd_resume, + .cb_exit = cb_statsd_exit, + .flags = 0 +};