Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_collectd: Implement a new input plugin for collectd #1506

Merged
merged 2 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ option(FLB_IN_SYSTEMD "Enable Systemd input plugin" Yes)
option(FLB_IN_DUMMY "Enable Dummy input plugin" Yes)
option(FLB_IN_NETIF "Enable NetworkIF input plugin" Yes)
option(FLB_IN_WINLOG "Enable Windows Log input plugin" No)
option(FLB_IN_COLLECTD "Enable Collectd input plugin" Yes)
option(FLB_IN_STORAGE_BACKLOG "Enable stoage backlog input plugin" Yes)
option(FLB_OUT_AZURE "Enable Azure output plugin" Yes)
option(FLB_OUT_BIGQUERY "Enable BigQuery output plugin" Yes)
Expand Down
1 change: 1 addition & 0 deletions cmake/windows-setup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ set(FLB_IN_SYSTEMD No)
set(FLB_IN_DUMMY Yes)
set(FLB_IN_NETIF No)
set(FLB_IN_WINLOG Yes)
set(FLB_IN_COLLECTD No)
set(FLB_IN_STORAGE_BACKLOG No)

# OUTPUT plugins
Expand Down
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ REGISTER_IN_PLUGIN("in_dummy")
REGISTER_IN_PLUGIN("in_head")
REGISTER_IN_PLUGIN("in_health")
REGISTER_IN_PLUGIN("in_http")
REGISTER_IN_PLUGIN("in_collectd")
REGISTER_IN_PLUGIN("in_storage_backlog")

if (FLB_STREAM_PROCESSOR)
Expand Down
7 changes: 7 additions & 0 deletions plugins/in_collectd/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set(src
typesdb.c
typesdb_parser.c
netprot.c
in_collectd.c)

FLB_PLUGIN(in_collectd "${src}" "")
213 changes: 213 additions & 0 deletions plugins/in_collectd/in_collectd.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2019 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <fluent-bit/flb_compat.h>
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_utils.h>
#include <msgpack.h>
#include "netprot.h"
#include "typesdb.h"

/*
* Max payload size. By default, Collectd sends up to 1452 bytes
* per a UDP packet, but the limit can be increased up to 65535
* bytes through a configuration parameter.
*
* See network_config_set_buffer_size() in collectd/src/network.c.
*/
#define BUFFER_SIZE 65535

#define DEFAULT_LISTEN "0.0.0.0"
#define DEFAULT_PORT 25826

/* This is where most Linux systems places a default TypesDB */
#define DEFAULT_TYPESDB "/usr/share/collectd/types.db";

struct flb_in_collectd_config {
char *buf;
int bufsize;

/* Server */
char listen[256]; /* RFC-2181 */
char port[6]; /* RFC-793 */

/* Sockets */
flb_sockfd_t server_fd;
flb_pipefd_t coll_fd;

struct mk_list *tdb;

/* Plugin input instance */
struct flb_input_instance *i_ins;
};

static int in_collectd_callback(struct flb_input_instance *i_ins,
struct flb_config *config, void *in_context);

static int in_collectd_init(struct flb_input_instance *in,
struct flb_config *config, void *data)
{
int ret;
const char *tmp;
struct flb_in_collectd_config *ctx;
struct mk_list *tdb;
char *listen = DEFAULT_LISTEN;
int port = DEFAULT_PORT;

/* Initialize context */
ctx = flb_calloc(1, sizeof(struct flb_in_collectd_config));
if (!ctx) {
flb_errno();
return -1;
}
ctx->i_ins = in;

ctx->bufsize = BUFFER_SIZE;
ctx->buf = flb_malloc(ctx->bufsize);
if (!ctx->buf) {
flb_errno();
flb_free(ctx);
return -1;
}

/* Listening address */
if (in->host.listen) {
listen = in->host.listen;
}

if (strlen(listen) > sizeof(ctx->listen) - 1) {
flb_error("[in_collectd] too long address '%s'", listen);
flb_free(ctx);
return -1;
}
strcpy(ctx->listen, listen);

/* Listening port */
if (in->host.port) {
port = in->host.port;
}
snprintf(ctx->port, sizeof(ctx->port), "%hu", port);

/* TypesDB */
tmp = flb_input_get_property("typesdb", in);
if (!tmp) {
tmp = DEFAULT_TYPESDB;
}

flb_debug("[in_collectd] Loading TypesDB from %s", tmp);

tdb = typesdb_load_all(tmp);
if (!tdb) {
flb_error("[in_collectd] failed to load '%s'", tmp);
flb_free(ctx->buf);
flb_free(ctx);
return -1;
}
ctx->tdb = tdb;

/* Set the context */
flb_input_set_context(in, ctx);

ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen);
if (ctx->server_fd < 0) {
flb_error("[in_collectd] failed to bind to %s:%s", ctx->listen,
ctx->port);
typesdb_destroy(ctx->tdb);
flb_free(ctx->buf);
flb_free(ctx);
return -1;
}

/* Set the collector */
ret = flb_input_set_collector_socket(in,
in_collectd_callback,
ctx->server_fd,
config);
if (ret == -1) {
flb_error("[in_collectd] failed set up a collector");
flb_socket_close(ctx->server_fd);
typesdb_destroy(ctx->tdb);
flb_free(ctx->buf);
flb_free(ctx);
return -1;
}
ctx->coll_fd = ret;

flb_info("[in_collectd] start listening to %s:%s", ctx->listen,
ctx->port);
return 0;
}

static int in_collectd_callback(struct flb_input_instance *i_ins,
struct flb_config *config, void *in_context)
{
struct flb_in_collectd_config *ctx = in_context;
int len;
msgpack_packer pck;
msgpack_sbuffer sbuf;

len = recv(ctx->server_fd, ctx->buf, ctx->bufsize, 0);
if (len < 0) {
flb_errno();
return -1;
}
if (len == 0) {
return 0;
}

msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);

if (netprot_to_msgpack(ctx->buf, len, ctx->tdb, &pck)) {
flb_error("[in_collectd] netprot_to_msgpack fails");
msgpack_sbuffer_destroy(&sbuf);
return -1;
}

flb_input_chunk_append_raw(i_ins, NULL, 0, sbuf.data, sbuf.size);

msgpack_sbuffer_destroy(&sbuf);
return 0;
}

static int in_collectd_exit(void *data, struct flb_config *config)
{
struct flb_in_collectd_config *ctx = data;
flb_socket_close(ctx->server_fd);
flb_pipe_close(ctx->coll_fd);
typesdb_destroy(ctx->tdb);
flb_free(ctx->buf);
flb_free(ctx);
return 0;
}

struct flb_input_plugin in_collectd_plugin = {
.name = "collectd",
.description = "collectd input plugin",
.cb_init = in_collectd_init,
.cb_pre_run = NULL,
.cb_collect = NULL,
.cb_flush_buf = NULL,
.cb_pause = NULL,
.cb_resume = NULL,
.cb_exit = in_collectd_exit
};
Loading