From f60176de9f26c0a827754abe5d0a96c50acae39a Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Wed, 21 Aug 2019 07:45:19 +0900 Subject: [PATCH] in_collectd: Implement a new input plugin for collectd (#1506) * in_collectd: Implement a new input plugin for collectd This allows fluent-bit to receive UDP packets from collectd's Network plugin. You can use it as: $ fluent-bit -i collectd -o stdout ... then it listens to `0.0.0.0:25826` and consumes incoming packets. This implements the basic support for collectd's binary protocol and also contains a compatible parser for types.db (5), so it should cover the most real-life use cases. The remaining features to be implemented are advanced features like AES encryption and HMAC signatures (introduced in v4.7), which we will address in the future patches. Signed-off-by: Fujimoto Seiji * in_collectd: Use 'Listen' instead of 'Address' as an option name This makes the plugin more conforming to the established interface standard, thus makes it easier to maintain. Signed-off-by: Fujimoto Seiji --- CMakeLists.txt | 1 + cmake/windows-setup.cmake | 1 + plugins/CMakeLists.txt | 1 + plugins/in_collectd/CMakeLists.txt | 7 + plugins/in_collectd/in_collectd.c | 213 ++++++++++++++++++++ plugins/in_collectd/netprot.c | 280 +++++++++++++++++++++++++++ plugins/in_collectd/netprot.h | 23 +++ plugins/in_collectd/typesdb.c | 220 +++++++++++++++++++++ plugins/in_collectd/typesdb.h | 43 ++++ plugins/in_collectd/typesdb_parser.c | 215 ++++++++++++++++++++ plugins/in_collectd/typesdb_parser.h | 21 ++ 11 files changed, 1025 insertions(+) create mode 100644 plugins/in_collectd/CMakeLists.txt create mode 100644 plugins/in_collectd/in_collectd.c create mode 100644 plugins/in_collectd/netprot.c create mode 100644 plugins/in_collectd/netprot.h create mode 100644 plugins/in_collectd/typesdb.c create mode 100644 plugins/in_collectd/typesdb.h create mode 100644 plugins/in_collectd/typesdb_parser.c create mode 100644 plugins/in_collectd/typesdb_parser.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 753161fc4d6..d1d8a6e237b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -115,6 +115,7 @@ option(FLB_IN_SYSTEMD "Enable Systemd input plugin" Yes) option(FLB_IN_DUMMY "Enable Dummy input plugin" Yes) option(FLB_IN_NETIF "Enable NetworkIF input plugin" Yes) option(FLB_IN_WINLOG "Enable Windows Log input plugin" No) +option(FLB_IN_COLLECTD "Enable Collectd input plugin" Yes) option(FLB_IN_STORAGE_BACKLOG "Enable stoage backlog input plugin" Yes) option(FLB_OUT_AZURE "Enable Azure output plugin" Yes) option(FLB_OUT_BIGQUERY "Enable BigQuery output plugin" Yes) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index d066447a0ad..68f1d7aa7c5 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -37,6 +37,7 @@ set(FLB_IN_SYSTEMD No) set(FLB_IN_DUMMY Yes) set(FLB_IN_NETIF No) set(FLB_IN_WINLOG Yes) +set(FLB_IN_COLLECTD No) set(FLB_IN_STORAGE_BACKLOG No) # OUTPUT plugins diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index ec36d7b2f01..4a765b54996 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -103,6 +103,7 @@ REGISTER_IN_PLUGIN("in_dummy") REGISTER_IN_PLUGIN("in_head") REGISTER_IN_PLUGIN("in_health") REGISTER_IN_PLUGIN("in_http") +REGISTER_IN_PLUGIN("in_collectd") REGISTER_IN_PLUGIN("in_storage_backlog") if (FLB_STREAM_PROCESSOR) diff --git a/plugins/in_collectd/CMakeLists.txt b/plugins/in_collectd/CMakeLists.txt new file mode 100644 index 00000000000..1f8d6407404 --- /dev/null +++ b/plugins/in_collectd/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + typesdb.c + typesdb_parser.c + netprot.c + in_collectd.c) + +FLB_PLUGIN(in_collectd "${src}" "") diff --git a/plugins/in_collectd/in_collectd.c b/plugins/in_collectd/in_collectd.c new file mode 100644 index 00000000000..04e6e79b9d4 --- /dev/null +++ b/plugins/in_collectd/in_collectd.c @@ -0,0 +1,213 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include "netprot.h" +#include "typesdb.h" + +/* + * Max payload size. By default, Collectd sends up to 1452 bytes + * per a UDP packet, but the limit can be increased up to 65535 + * bytes through a configuration parameter. + * + * See network_config_set_buffer_size() in collectd/src/network.c. + */ +#define BUFFER_SIZE 65535 + +#define DEFAULT_LISTEN "0.0.0.0" +#define DEFAULT_PORT 25826 + +/* This is where most Linux systems places a default TypesDB */ +#define DEFAULT_TYPESDB "/usr/share/collectd/types.db"; + +struct flb_in_collectd_config { + char *buf; + int bufsize; + + /* Server */ + char listen[256]; /* RFC-2181 */ + char port[6]; /* RFC-793 */ + + /* Sockets */ + flb_sockfd_t server_fd; + flb_pipefd_t coll_fd; + + struct mk_list *tdb; + + /* Plugin input instance */ + struct flb_input_instance *i_ins; +}; + +static int in_collectd_callback(struct flb_input_instance *i_ins, + struct flb_config *config, void *in_context); + +static int in_collectd_init(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + int ret; + const char *tmp; + struct flb_in_collectd_config *ctx; + struct mk_list *tdb; + char *listen = DEFAULT_LISTEN; + int port = DEFAULT_PORT; + + /* Initialize context */ + ctx = flb_calloc(1, sizeof(struct flb_in_collectd_config)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->i_ins = in; + + ctx->bufsize = BUFFER_SIZE; + ctx->buf = flb_malloc(ctx->bufsize); + if (!ctx->buf) { + flb_errno(); + flb_free(ctx); + return -1; + } + + /* Listening address */ + if (in->host.listen) { + listen = in->host.listen; + } + + if (strlen(listen) > sizeof(ctx->listen) - 1) { + flb_error("[in_collectd] too long address '%s'", listen); + flb_free(ctx); + return -1; + } + strcpy(ctx->listen, listen); + + /* Listening port */ + if (in->host.port) { + port = in->host.port; + } + snprintf(ctx->port, sizeof(ctx->port), "%hu", port); + + /* TypesDB */ + tmp = flb_input_get_property("typesdb", in); + if (!tmp) { + tmp = DEFAULT_TYPESDB; + } + + flb_debug("[in_collectd] Loading TypesDB from %s", tmp); + + tdb = typesdb_load_all(tmp); + if (!tdb) { + flb_error("[in_collectd] failed to load '%s'", tmp); + flb_free(ctx->buf); + flb_free(ctx); + return -1; + } + ctx->tdb = tdb; + + /* Set the context */ + flb_input_set_context(in, ctx); + + ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen); + if (ctx->server_fd < 0) { + flb_error("[in_collectd] failed to bind to %s:%s", ctx->listen, + ctx->port); + typesdb_destroy(ctx->tdb); + flb_free(ctx->buf); + flb_free(ctx); + return -1; + } + + /* Set the collector */ + ret = flb_input_set_collector_socket(in, + in_collectd_callback, + ctx->server_fd, + config); + if (ret == -1) { + flb_error("[in_collectd] failed set up a collector"); + flb_socket_close(ctx->server_fd); + typesdb_destroy(ctx->tdb); + flb_free(ctx->buf); + flb_free(ctx); + return -1; + } + ctx->coll_fd = ret; + + flb_info("[in_collectd] start listening to %s:%s", ctx->listen, + ctx->port); + return 0; +} + +static int in_collectd_callback(struct flb_input_instance *i_ins, + struct flb_config *config, void *in_context) +{ + struct flb_in_collectd_config *ctx = in_context; + int len; + msgpack_packer pck; + msgpack_sbuffer sbuf; + + len = recv(ctx->server_fd, ctx->buf, ctx->bufsize, 0); + if (len < 0) { + flb_errno(); + return -1; + } + if (len == 0) { + return 0; + } + + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); + + if (netprot_to_msgpack(ctx->buf, len, ctx->tdb, &pck)) { + flb_error("[in_collectd] netprot_to_msgpack fails"); + msgpack_sbuffer_destroy(&sbuf); + return -1; + } + + flb_input_chunk_append_raw(i_ins, NULL, 0, sbuf.data, sbuf.size); + + msgpack_sbuffer_destroy(&sbuf); + return 0; +} + +static int in_collectd_exit(void *data, struct flb_config *config) +{ + struct flb_in_collectd_config *ctx = data; + flb_socket_close(ctx->server_fd); + flb_pipe_close(ctx->coll_fd); + typesdb_destroy(ctx->tdb); + flb_free(ctx->buf); + flb_free(ctx); + return 0; +} + +struct flb_input_plugin in_collectd_plugin = { + .name = "collectd", + .description = "collectd input plugin", + .cb_init = in_collectd_init, + .cb_pre_run = NULL, + .cb_collect = NULL, + .cb_flush_buf = NULL, + .cb_pause = NULL, + .cb_resume = NULL, + .cb_exit = in_collectd_exit +}; diff --git a/plugins/in_collectd/netprot.c b/plugins/in_collectd/netprot.c new file mode 100644 index 00000000000..7cef4b43b51 --- /dev/null +++ b/plugins/in_collectd/netprot.c @@ -0,0 +1,280 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This module implements the binary network protocol of collectd. + * (https://collectd.org/wiki/index.php/Binary_protocol) + * + * The only interface you need to care is netprot_to_msgpack() that + * parses a UDP packet and converts it into MessagePack format. + */ + +#include +#include +#include +#include +#include "netprot.h" +#include "typesdb.h" + +#define be16read(x) (be16toh(*(uint16_t *) (x))) +#define be32read(x) (be32toh(*(uint32_t *) (x))) +#define be64read(x) (be64toh(*(uint64_t *) (x))) + +#define le16read(x) (le16toh(*(uint16_t *) (x))) +#define le32read(x) (le32toh(*(uint32_t *) (x))) +#define le64read(x) (le64toh(*(uint64_t *) (x))) + +/* Convert a high-resolution time into a normal UNIX time. */ +#define hr2time(x) ((double) (x) / 1073741824) + +/* Basic data field definitions for collectd */ +#define PART_HOST 0x0000 +#define PART_TIME 0x0001 +#define PART_PLUGIN 0x0002 +#define PART_PLUGIN_INSTANCE 0x0003 +#define PART_TYPE 0x0004 +#define PART_TYPE_INSTANCE 0x0005 +#define PART_VALUE 0x0006 +#define PART_INTERVAL 0x0007 + +#define PART_TIME_HR 0x0008 +#define PART_INTERVAL_HR 0x0009 + +/* + * The "DS_TYPE_*" are value types for PART_VALUE fields. + * + * Read https://collectd.org/wiki/index.php/Data_source for what + * these types mean. + */ +#define DS_TYPE_COUNTER 0 +#define DS_TYPE_GAUGE 1 +#define DS_TYPE_DERIVE 2 +#define DS_TYPE_ABSOLUTE 3 + +struct netprot_header +{ + double time; + double interval; + char *host; + char *plugin; + char *plugin_instance; + char *type; + char *type_instance; +}; + +static int netprot_pack_cstr(msgpack_packer *ppck, char *s) { + int len = strlen(s); + msgpack_pack_str(ppck, len); + msgpack_pack_str_body(ppck, s, len); + return 0; +} + +/* Return the number of non-empty fields in the header */ +static int netprot_header_count(struct netprot_header *hdr) +{ + return ((hdr->time > 0) + + (hdr->interval > 0) + + !!hdr->host + + !!hdr->type + + !!hdr->type_instance + + !!hdr->plugin + + !!hdr->plugin_instance); +} + +static int netprot_pack_value(char *ptr, int size, struct netprot_header *hdr, + struct mk_list *tdb, msgpack_packer *ppck) +{ + int i; + char type; + char *pval; + uint16_t count; + struct typesdb_node *node; + + if (hdr->type == NULL) { + flb_error("[in_collectd] invalid data (type is NULL)"); + return -1; + } + + /* + * Since each value uses (1 + 8) bytes, the total buffer size must + * be 2-byte header plus bytes. + */ + count = be16read(ptr); + if (size != 2 + count * 9) { + flb_error("[in_collectd] data corrupted (size=%i, count=%i)", + size, count); + return -1; + } + + /* + * We need to query against TypesDB in order to get field names + * for the data set values. + */ + node = typesdb_find_node(tdb, hdr->type); + if (!node) { + flb_error("[in_collectd] no such type found '%s'", hdr->type); + return -1; + } + if (node->count != count) { + flb_error("[in_collectd] invalid value for '%s' (%i != %i)", + hdr->type, node->count, count); + return -1; + } + + msgpack_pack_array(ppck, 2); + flb_pack_time_now(ppck); + + msgpack_pack_map(ppck, netprot_header_count(hdr) + count); + + netprot_pack_cstr(ppck, "type"); + netprot_pack_cstr(ppck, hdr->type); + + if (hdr->type_instance) { + netprot_pack_cstr(ppck, "type_instance"); + netprot_pack_cstr(ppck, hdr->type_instance); + } + + if (hdr->time > 0) { + netprot_pack_cstr(ppck, "time"); + msgpack_pack_double(ppck, hdr->time); + } + + if (hdr->interval > 0) { + netprot_pack_cstr(ppck, "interval"); + msgpack_pack_double(ppck, hdr->interval); + } + + if (hdr->plugin) { + netprot_pack_cstr(ppck, "plugin"); + netprot_pack_cstr(ppck, hdr->plugin); + } + + if (hdr->plugin_instance) { + netprot_pack_cstr(ppck, "plugin_instance"); + netprot_pack_cstr(ppck, hdr->plugin_instance); + } + + if (hdr->host) { + netprot_pack_cstr(ppck, "host"); + netprot_pack_cstr(ppck, hdr->host); + } + + for (i = 0; i < count; i++) { + pval = ptr + 2 + count + 8 * i; + type = ptr[2 + i]; + + netprot_pack_cstr(ppck, node->fields[i]); + + switch (type) { + case DS_TYPE_COUNTER: + msgpack_pack_uint64(ppck, be64read(pval)); + break; + case DS_TYPE_GAUGE: + msgpack_pack_double(ppck, *((double *) pval)); + break; + case DS_TYPE_DERIVE: + msgpack_pack_int64(ppck, (int64_t) be64read(pval)); + break; + case DS_TYPE_ABSOLUTE: + msgpack_pack_uint64(ppck, be64read(pval)); + break; + default: + flb_error("[in_collectd] unknown data type %i", type); + return -1; + } + } + return 0; +} + +/* + * Entry point function + */ +int netprot_to_msgpack(char *buf, int len, struct mk_list *tdb, + msgpack_packer *ppck) +{ + uint16_t part_type; + uint16_t part_len; + int size; + char *ptr; + struct netprot_header hdr = {0}; + + while (len >= 4) { + part_type = be16read(buf); + part_len = be16read(buf + 2); + + if (len < part_len) { + flb_error("[in_collectd] data truncated (%i < %i)", len, part_len); + return -1; + } + ptr = buf + 4; + size = part_len - 4; + + switch (part_type) { + case PART_HOST: + if (ptr[size] == '\0') { + hdr.host = ptr; + } + break; + case PART_TIME: + hdr.time = (double) be64read(ptr); + break; + case PART_TIME_HR: + hdr.time = hr2time(be64read(ptr)); + break; + case PART_PLUGIN: + if (ptr[size] == '\0') { + hdr.plugin = ptr; + } + break; + case PART_PLUGIN_INSTANCE: + if (ptr[size] == '\0') { + hdr.plugin_instance = ptr; + } + break; + case PART_TYPE: + if (ptr[size] == '\0') { + hdr.type = ptr; + } + break; + case PART_TYPE_INSTANCE: + if (ptr[size] == '\0') { + hdr.type_instance = ptr; + } + break; + case PART_VALUE: + if (netprot_pack_value(ptr, size, &hdr, tdb, ppck)) { + return -1; + } + break; + case PART_INTERVAL: + hdr.interval = (double) be64read(ptr); + break; + case PART_INTERVAL_HR: + hdr.interval = hr2time(be64read(ptr)); + break; + default: + flb_debug("[in_collectd] skip unknown type %x", part_type); + break; + } + len -= part_len; + buf += part_len; + } + return 0; +} diff --git a/plugins/in_collectd/netprot.h b/plugins/in_collectd/netprot.h new file mode 100644 index 00000000000..445308d486e --- /dev/null +++ b/plugins/in_collectd/netprot.h @@ -0,0 +1,23 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* Convert a binary buffer into MessagePack format */ +int netprot_to_msgpack(char *buf, int len, struct mk_list *tdb, + msgpack_packer *ppck); diff --git a/plugins/in_collectd/typesdb.c b/plugins/in_collectd/typesdb.c new file mode 100644 index 00000000000..efbffc0352c --- /dev/null +++ b/plugins/in_collectd/typesdb.c @@ -0,0 +1,220 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include "typesdb.h" +#include "typesdb_parser.h" + +#include +#include +#include + + +/* Internal function to load from a single TypesDB */ +static int typesdb_load(struct mk_list *tdb, const char *path) +{ + int fd = open(path, O_RDONLY); + if (fd < 0) { + flb_errno(); + flb_error("[in_collectd] failed to open '%s'", path); + return -1; + } + + if (typesdb_parse(tdb, fd)) { + flb_error("[in_collectd] failed to parse '%s'", path); + close(fd); + return -1; + } + close(fd); + return 0; +} + +/* + * Load multiple TypesDB files at once. The return value is + * a linked list of typesdb_node objects. + * + * "paths" is a comma-separated list of file names. + */ +struct mk_list *typesdb_load_all(const char *paths) +{ + char *buf; + char *state; + char *path; + struct mk_list *tdb; + + buf = flb_strdup(paths); + if (!buf) { + flb_errno(); + return NULL; + } + + tdb = flb_malloc(sizeof(struct mk_list)); + if (!tdb) { + flb_errno(); + flb_free(buf); + return NULL; + } + mk_list_init(tdb); + + path = strtok_r(buf, ",", &state); + while (path) { + if (typesdb_load(tdb, path)) { + typesdb_destroy(tdb); + flb_free(buf); + return NULL; + } + path = strtok_r(NULL, ",", &state); + } + flb_free(buf); + return tdb; +} + +void typesdb_destroy(struct mk_list *tdb) +{ + struct typesdb_node *node; + struct mk_list *head; + struct mk_list *tmp; + + mk_list_foreach_safe(head, tmp, tdb) { + node = mk_list_entry(head, struct typesdb_node, _head); + typesdb_destroy_node(node); + } + flb_free(tdb); +} + +struct typesdb_node *typesdb_find_node(struct mk_list *tdb, const char *type) +{ + struct typesdb_node *node; + struct mk_list *head; + + if (type == NULL) { + return NULL; + } + + /* + * Search the linked list from the tail so that later entries + * take precedence over earlier ones. + */ + mk_list_foreach_r(head, tdb) { + node = mk_list_entry(head, struct typesdb_node, _head); + if (strcmp(node->type, type) == 0) { + return node; + } + } + return NULL; +} + +struct typesdb_node *typesdb_last_node(struct mk_list *tdb) +{ + return mk_list_entry_last(tdb, struct typesdb_node, _head); +} + +/* + * The folloings are API functions to modify a TypesDB instance. + */ +int typesdb_add_node(struct mk_list *tdb, const char *type) +{ + struct typesdb_node *node; + + node = flb_calloc(1, sizeof(struct typesdb_node)); + if (!node) { + flb_errno(); + return -1; + } + + node->type = flb_strdup(type); + if (!node->type) { + flb_errno(); + flb_free(node); + return -1; + } + + mk_list_add(&node->_head, tdb); + return 0; +} + +void typesdb_destroy_node(struct typesdb_node *node) +{ + int i; + + flb_free(node->type); + + if (node->fields) { + for (i = 0; i < node->count; i++) { + flb_free(node->fields[i]); + } + flb_free(node->fields); + } + mk_list_del(&node->_head); + flb_free(node); +} + +int typesdb_add_field(struct typesdb_node *node, const char *field) +{ + char *end; + int alloc; + char **fields; + + end = strchr(field, ':'); + if (!end) { + return -1; + } + + if (node->count >= node->alloc) { + alloc = node->alloc > 0 ? node->alloc * 2 : 1; + fields = flb_realloc(node->fields, alloc * sizeof(char *)); + if (!fields) { + flb_errno(); + return -1; + } + node->alloc = alloc; + node->fields = fields; + } + + node->fields[node->count] = flb_strndup(field, end - field); + if (!node->fields[node->count]) { + flb_errno(); + return -1; + } + node->count++; + return 0; +} + +/* A debug function to see the content of TypesDB */ +void typesdb_dump(struct mk_list *tdb) +{ + struct mk_list *head; + struct typesdb_node *node; + int i; + + mk_list_foreach(head, tdb) { + node = mk_list_entry(head, struct typesdb_node, _head); + + printf("%s", node->type); + for (i = 0; i < node->count; i++) { + printf("\t%s", node->fields[i]); + } + putchar('\n'); + } +} diff --git a/plugins/in_collectd/typesdb.h b/plugins/in_collectd/typesdb.h new file mode 100644 index 00000000000..45980a72c86 --- /dev/null +++ b/plugins/in_collectd/typesdb.h @@ -0,0 +1,43 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +struct typesdb_node { + char *type; + int alloc; + int count; + char **fields; + struct mk_list _head; +}; + +/* Load and destroy TypesDB */ +struct mk_list *typesdb_load_all(const char *paths); +void typesdb_destroy(struct mk_list *tdb); + +/* Find a node in TypesDB */ +struct typesdb_node *typesdb_find_node(struct mk_list *tdb, const char *type); +struct typesdb_node *typesdb_last_node(struct mk_list *tdb); + +/* Modify a TypesDB instance (used in typesdb_parser.c) */ +int typesdb_add_node(struct mk_list *tdb, const char *type); +void typesdb_destroy_node(struct typesdb_node *node); +int typesdb_add_field(struct typesdb_node *node, const char *field); + +/* For debugging */ +void typesdb_dump(struct mk_list *tdb); diff --git a/plugins/in_collectd/typesdb_parser.c b/plugins/in_collectd/typesdb_parser.c new file mode 100644 index 00000000000..187a3666156 --- /dev/null +++ b/plugins/in_collectd/typesdb_parser.c @@ -0,0 +1,215 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This file implements a collectd 5.x compatible parser for types.db(5). + * + * Note: it internally implements a finite state machine that consumes a + * single char at once, and pushes parsed tokens via typesdb_* methods. + */ + +#include +#include +#include +#include + +#include "typesdb.h" +#include "typesdb_parser.h" + +#define TDB_INVALID -1 +#define TDB_INIT 0 +#define TDB_LEFT 1 +#define TDB_SEP 2 +#define TDB_RIGHT 3 +#define TDB_RIGHT_SEP 4 +#define TDB_COMMENT 5 + +/* See collectd/src/daemon/types_list.c */ +#define MAX_LINE_SIZE 4096 + +/* + * tdb_* are state functions that take a single character as input. + * They do some action based on the input and return the next state. + */ +static int tdb_init(char c, struct mk_list *tdb, char *buf) +{ + switch (c) { + case '#': + return TDB_COMMENT; + case '\r': + case '\n': + return TDB_INIT; + default: + buf[0] = c; + buf[1] = '\0'; + return TDB_LEFT; + } +} + +static int tdb_left(char c, struct mk_list *tdb, char *buf) +{ + int len; + + switch (c) { + case ' ': + if (typesdb_add_node(tdb, buf)) { + return TDB_INVALID; + } + return TDB_SEP; + case '\r': + case '\n': + return TDB_INVALID; + default: + len = strlen(buf); + if (len >= MAX_LINE_SIZE - 1) { + return TDB_INVALID; + } + buf[len] = c; + buf[++len] = '\0'; + return TDB_LEFT; + } +} + +static int tdb_sep(char c, struct mk_list *tdb, char *buf) +{ + switch (c) { + case ' ': + return TDB_SEP; + case '\r': + case '\n': + return TDB_INVALID; + default: + buf[0] = c; + buf[1] = '\0'; + return TDB_RIGHT; + } +} + +static int tdb_right(char c, struct mk_list *tdb, char *buf) +{ + int len; + struct typesdb_node *node = typesdb_last_node(tdb); + + switch (c) { + case ' ': + case ',': + if (typesdb_add_field(node, buf)) { + flb_error("[in_collectd] cannot add value '%s'", buf); + return TDB_INVALID; + } + return TDB_RIGHT_SEP; + case '\r': + case '\n': + if (typesdb_add_field(node, buf)) { + flb_error("[in_collectd] cannot add value '%s'", buf); + return TDB_INVALID; + } + return TDB_INIT; + default: + len = strlen(buf); + if (len >= MAX_LINE_SIZE - 1) { + flb_error("[in_collectd] line too long > %i", MAX_LINE_SIZE); + return TDB_INVALID; + } + buf[len] = c; + buf[++len] = '\0'; + return TDB_RIGHT; + } +} + +static int tdb_right_sep(char c, struct mk_list *tdb, char *buf) +{ + switch (c) { + case ' ': + case ',': + return TDB_RIGHT_SEP; + case '\r': + case '\n': + return TDB_INIT; + default: + buf[0] = c; + buf[1] = '\0'; + return TDB_RIGHT; + } +} + +static int tdb_comment(char c, struct mk_list *tdb, char *buf) +{ + switch (c) { + case '\r': + case '\n': + return TDB_INIT; + default: + return TDB_COMMENT; + } +} + +/* + * Entry point function + */ +int typesdb_parse(struct mk_list *tdb, int fp) +{ + char tmp[1024]; + char buf[MAX_LINE_SIZE]; + char c; + int i; + int bytes; + int state = TDB_INIT; + + while (1) { + bytes = read(fp, tmp, 1024); + if (bytes < 0) { + flb_errno(); + return bytes; + } + if (bytes == 0) { + return 0; + } + for (i = 0; i < bytes; i++) { + c = tmp[i]; + switch (state) { + case TDB_INVALID: + return -1; + case TDB_INIT: + state = tdb_init(c, tdb, buf); + break; + case TDB_LEFT: + state = tdb_left(c, tdb, buf); + break; + case TDB_SEP: + state = tdb_sep(c, tdb, buf); + break; + case TDB_RIGHT: + state = tdb_right(c, tdb, buf); + break; + case TDB_RIGHT_SEP: + state = tdb_right_sep(c, tdb, buf); + break; + case TDB_COMMENT: + state = tdb_comment(c, tdb, buf); + break; + default: + flb_error("[in_collectd] unknown state %i", state); + return -1; + } + } + } + return 0; +} diff --git a/plugins/in_collectd/typesdb_parser.h b/plugins/in_collectd/typesdb_parser.h new file mode 100644 index 00000000000..caa77bc009e --- /dev/null +++ b/plugins/in_collectd/typesdb_parser.h @@ -0,0 +1,21 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +int typesdb_parse(struct mk_list *tdb, int fp);