Skip to content

Commit

Permalink
in_statsd: implement the support for "statsd" protocol
Browse files Browse the repository at this point in the history
This is the first cut at adding statsd support to Fluent Bit.
You can use the "in_statsd" plugins as follows:

    $ fluent-bit -i statsd -o stdout

... now you can input metrics like:

    $ echo "click:10|c|@0.1" > /dev/udp/127.0.0.1/8125
    $ echo "active:+10|g" > /dev/udp/127.0.0.1/8125

This plugin will parse the incoming messages and produce well-
formatted records like below:

    {"type"=>"counter", "bucket"=>"click", "value"=>10.000000, "sample_rate"=>0.100000}
    {"type"=>"gauge", "bucket"=>"active", "value"=>10.000000, "incremental"=>1}

With this, we can easily collect performance logs from services
with statsd ouput support (like cadvisor).

Signed-off-by: Fujimoto Seiji <[email protected]>
  • Loading branch information
Fujimoto Seiji authored and edsiper committed Jan 16, 2020
1 parent 87380a1 commit 901e43f
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ option(FLB_IN_DUMMY "Enable Dummy input plugin" Yes)
option(FLB_IN_NETIF "Enable NetworkIF input plugin" Yes)
option(FLB_IN_WINLOG "Enable Windows Log input plugin" No)
option(FLB_IN_COLLECTD "Enable Collectd input plugin" Yes)
option(FLB_IN_STATSD "Enable StatsD input plugin" Yes)
option(FLB_IN_STORAGE_BACKLOG "Enable storage backlog input plugin" Yes)
option(FLB_OUT_AZURE "Enable Azure output plugin" Yes)
option(FLB_OUT_BIGQUERY "Enable BigQuery output plugin" Yes)
Expand Down
1 change: 1 addition & 0 deletions cmake/windows-setup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ set(FLB_IN_DUMMY Yes)
set(FLB_IN_NETIF No)
set(FLB_IN_WINLOG Yes)
set(FLB_IN_COLLECTD No)
set(FLB_IN_STATSD No)
set(FLB_IN_STORAGE_BACKLOG No)

# OUTPUT plugins
Expand Down
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ REGISTER_IN_PLUGIN("in_head")
REGISTER_IN_PLUGIN("in_health")
REGISTER_IN_PLUGIN("in_http")
REGISTER_IN_PLUGIN("in_collectd")
REGISTER_IN_PLUGIN("in_statsd")
REGISTER_IN_PLUGIN("in_storage_backlog")

if (FLB_STREAM_PROCESSOR)
Expand Down
4 changes: 4 additions & 0 deletions plugins/in_statsd/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
set(src
statsd.c)

FLB_PLUGIN(in_statsd "${src}" "")
335 changes: 335 additions & 0 deletions plugins/in_statsd/statsd.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2019 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_socket.h>
#include <fluent-bit/flb_pack.h>

#define MAX_PACKET_SIZE 65536
#define DEFAULT_LISTEN "0.0.0.0"
#define DEFAULT_PORT 8125

#define STATSD_TYPE_COUNTER 1
#define STATSD_TYPE_GAUGE 2
#define STATSD_TYPE_TIMER 3
#define STATSD_TYPE_SET 4

struct flb_statsd {
char *buf; /* buffer */
char listen[256]; /* listening address (RFC-2181) */
char port[6]; /* listening port (RFC-793) */
flb_sockfd_t server_fd; /* server socket */
flb_pipefd_t coll_fd; /* server handler */
struct flb_input_instance *i_ins; /* input instance */
};

/*
* The "statsd_message" represents a single line in UDP packet.
* It's just a bunch of pointers to ephemeral buffer.
*/
struct statsd_message {
char *bucket;
int bucket_len;
char *value;
int value_len;
int type;
double sample_rate;
};

static void pack_string(msgpack_packer *mp_pck, char *str, size_t len)
{
if (len < 0) {
len = strlen(str);
}
msgpack_pack_str(mp_pck, len);
msgpack_pack_str_body(mp_pck, str, len);
}

static int get_statsd_type(char *str)
{
switch (*str) {
case 'g':
return STATSD_TYPE_GAUGE;
case 's':
return STATSD_TYPE_SET;
case 'c':
return STATSD_TYPE_COUNTER;
case 'm':
if (*(str + 1) == 's') {
return STATSD_TYPE_TIMER;
}
}
return STATSD_TYPE_COUNTER;
}

static int is_incremental(char *str)
{
return (*str == '+' || *str == '-');
}

static int statsd_process_message(msgpack_packer *mp_pck,
struct statsd_message *m)
{
msgpack_pack_array(mp_pck, 2);
flb_pack_time_now(mp_pck);

switch (m->type) {
case STATSD_TYPE_COUNTER:
msgpack_pack_map(mp_pck, 4);
pack_string(mp_pck, "type", 4);
pack_string(mp_pck, "counter", 7);
pack_string(mp_pck, "bucket", 6);
pack_string(mp_pck, m->bucket, m->bucket_len);
pack_string(mp_pck, "value", 5);
msgpack_pack_double(mp_pck, atof(m->value));
pack_string(mp_pck, "sample_rate", 11);
msgpack_pack_double(mp_pck, m->sample_rate);
break;
case STATSD_TYPE_GAUGE:
msgpack_pack_map(mp_pck, 4);
pack_string(mp_pck, "type", 4);
pack_string(mp_pck, "gauge", 5);
pack_string(mp_pck, "bucket", 6);
pack_string(mp_pck, m->bucket, m->bucket_len);
pack_string(mp_pck, "value", 5);
msgpack_pack_double(mp_pck, atof(m->value));
pack_string(mp_pck, "incremental", 11);
msgpack_pack_int(mp_pck, is_incremental(m->value));
break;
case STATSD_TYPE_TIMER:
msgpack_pack_map(mp_pck, 4);
pack_string(mp_pck, "type", 4);
pack_string(mp_pck, "timer", 5);
pack_string(mp_pck, "bucket", 6);
pack_string(mp_pck, m->bucket, m->bucket_len);
pack_string(mp_pck, "value", 5);
msgpack_pack_double(mp_pck, atof(m->value));
pack_string(mp_pck, "sample_rate", 11);
msgpack_pack_double(mp_pck, m->sample_rate);
break;
case STATSD_TYPE_SET:
msgpack_pack_map(mp_pck, 3);
pack_string(mp_pck, "type", 4);
pack_string(mp_pck, "set", 3);
pack_string(mp_pck, "bucket", 6);
pack_string(mp_pck, m->bucket, m->bucket_len);
pack_string(mp_pck, "value", 5);
pack_string(mp_pck, m->value, m->value_len);
break;
}
return 0;
}

static int statsd_process_line(msgpack_packer *mp_pck, char *line)
{
char *colon, *bar, *atmark;
struct statsd_message m;

/*
* bucket:value|type|@sample_rate
* ------
*/
colon = strchr(line, ':');
if (colon == NULL) {
flb_error("[in_statsd] no bucket name found");
return -1;
}
m.bucket = line;
m.bucket_len = (colon - line);

/*
* bucket:value|type|@sample_rate
* ----
*/
bar = strchr(colon + 1, '|');
if (bar == NULL) {
flb_error("[in_statsd] no metric type found");
return -1;
}
m.type = get_statsd_type(bar + 1);

/*
* bucket:value|type|@sample_rate
* -----
*/
m.value = colon + 1;
m.value_len = (bar - colon - 1);

/*
* bucket:value|type|@sample_rate
* ------------
*/
atmark = strstr(bar + 1, "|@");
if (atmark == NULL || atof(atmark + 2) == 0) {
m.sample_rate = 1.0;
}
else {
m.sample_rate = atof(atmark + 2);
}

return statsd_process_message(mp_pck, &m);
}


static int cb_statsd_receive(struct flb_input_instance *i_ins,
struct flb_config *config, void *data)
{
struct flb_statsd *ctx = data;
char *line;
int len;
msgpack_packer mp_pck;
msgpack_sbuffer mp_sbuf;

/* Receive a UDP datagram */
len = recv(ctx->server_fd, ctx->buf, MAX_PACKET_SIZE - 1, 0);
if (len < 0) {
flb_errno();
return -1;
}
ctx->buf[len] = '\0';

msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

/* Process all messages in buffer */
line = strtok(ctx->buf, "\n");
while (line) {
flb_trace("[in_statsd] received a line: '%s'", line);
if (statsd_process_line(&mp_pck, line) < 0) {
flb_error("[in_statsd] failed to process line: '%s'", line);
}
line = strtok(NULL, "\n");
}

/* Send to output */
if (mp_sbuf.size > 0) {
flb_input_chunk_append_raw(i_ins, NULL, 0, mp_sbuf.data, mp_sbuf.size);
}
msgpack_sbuffer_destroy(&mp_sbuf);

return 0;
}

static int cb_statsd_init(struct flb_input_instance *i_ins,
struct flb_config *config, void *data)
{
struct flb_statsd *ctx;
char *listen;
int port;

ctx = flb_calloc(1, sizeof(struct flb_statsd));
if (!ctx) {
flb_errno();
return -1;
}

ctx->buf = flb_malloc(MAX_PACKET_SIZE);
if (!ctx->buf) {
flb_errno();
flb_free(ctx);
return -1;
}
ctx->i_ins = i_ins;

/* Listening address */
if (i_ins->host.listen) {
listen = i_ins->host.listen;
}
else {
listen = DEFAULT_LISTEN;
}
strncpy(ctx->listen, listen, sizeof(ctx->listen) - 1);

/* Listening port */
if (i_ins->host.port) {
port = i_ins->host.port;
}
else {
port = DEFAULT_PORT;
}
snprintf(ctx->port, sizeof(ctx->port), "%hu", port);

/* Export plugin context */
flb_input_set_context(i_ins, ctx);

/* Accepts metrics from UDP connections. */
ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen);
if (ctx->server_fd == -1) {
flb_error("[in_statsd] can't bind to %s:%s", ctx->listen, ctx->port);
flb_free(ctx->buf);
flb_free(ctx);
return -1;
}

/* Set up the UDP connection callback */
ctx->coll_fd = flb_input_set_collector_socket(i_ins, cb_statsd_receive,
ctx->server_fd, config);
if (ctx->coll_fd == -1) {
flb_error("[in_statsd] cannot set up connection callback ");
flb_socket_close(ctx->server_fd);
flb_free(ctx->buf);
flb_free(ctx);
return -1;
}

flb_info("[in_statsd] start UDP server on %s:%s", ctx->listen, ctx->port);

return 0;
}

static void cb_statsd_pause(void *data, struct flb_config *config)
{
struct flb_statsd *ctx = data;
flb_input_collector_pause(ctx->coll_fd, ctx->i_ins);
}

static void cb_statsd_resume(void *data, struct flb_config *config)
{
struct flb_statsd *ctx = data;
flb_input_collector_resume(ctx->coll_fd, ctx->i_ins);
}

static int cb_statsd_exit(void *data, struct flb_config *config)
{
struct flb_statsd *ctx = data;

flb_input_collector_pause(ctx->coll_fd, ctx->i_ins);
flb_socket_close(ctx->server_fd);
flb_free(ctx->buf);
flb_free(ctx);
return 0;
}

/* Plugin reference */
struct flb_input_plugin in_statsd_plugin = {
.name = "statsd",
.description = "StatsD input plugin",
.cb_init = cb_statsd_init,
.cb_pre_run = NULL,
.cb_collect = NULL,
.cb_ingest = NULL,
.cb_flush_buf = NULL,
.cb_pause = cb_statsd_pause,
.cb_resume = cb_statsd_resume,
.cb_exit = cb_statsd_exit,
.flags = 0
};

0 comments on commit 901e43f

Please sign in to comment.