diff --git a/plugins/in_statsd/statsd.c b/plugins/in_statsd/statsd.c index 74022d9c036..74fbd1fdad5 100644 --- a/plugins/in_statsd/statsd.c +++ b/plugins/in_statsd/statsd.c @@ -36,6 +36,7 @@ struct flb_statsd { char *buf; /* buffer */ char listen[256]; /* listening address (RFC-2181) */ char port[6]; /* listening port (RFC-793) */ + int metrics; /* Import as metrics */ flb_sockfd_t server_fd; /* server socket */ flb_pipefd_t coll_fd; /* server handler */ struct flb_input_instance *ins; /* input instance */ @@ -209,6 +210,10 @@ static int cb_statsd_receive(struct flb_input_instance *ins, char *line; int len; struct flb_statsd *ctx = data; +#ifdef FLB_HAVE_METRICS + struct cmt *cmt = NULL; + int cmt_flags = 0; +#endif /* Receive a UDP datagram */ len = recv(ctx->server_fd, ctx->buf, MAX_PACKET_SIZE - 1, 0); @@ -218,33 +223,56 @@ static int cb_statsd_receive(struct flb_input_instance *ins, } ctx->buf[len] = '\0'; - ret = FLB_EVENT_ENCODER_SUCCESS; - /* Process all messages in buffer */ - line = strtok(ctx->buf, "\n"); - while (line != NULL) { - flb_plg_trace(ctx->ins, "received a line: '%s'", line); +#ifdef FLB_HAVE_METRICS + if (ctx->metrics == FLB_TRUE) { + cmt_flags |= CMT_DECODE_STATSD_GAUGE_OBSERVER; + flb_plg_trace(ctx->ins, "received a buf: '%s'", ctx->buf); + ret = cmt_decode_statsd_create(&cmt, ctx->buf, len, cmt_flags); + if (ret != CMT_DECODE_STATSD_SUCCESS) { + flb_plg_error(ctx->ins, "failed to process buf: '%s'", ctx->buf); + return -1; + } - ret = statsd_process_line(ctx, line); + /* Append the updated metrics */ + ret = flb_input_metrics_append(ins, NULL, 0, cmt); + if (ret != 0) { + flb_plg_error(ins, "could not append metrics"); + } - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, "failed to process line: '%s'", line); + cmt_destroy(cmt); + } + else { +#endif + ret = FLB_EVENT_ENCODER_SUCCESS; + /* Process all messages in buffer */ + line = strtok(ctx->buf, "\n"); + while (line != NULL) { + flb_plg_trace(ctx->ins, "received a line: '%s'", line); - break; + ret = statsd_process_line(ctx, line); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, "failed to process line: '%s'", line); + + break; + } + + line = strtok(NULL, "\n"); } - line = strtok(NULL, "\n"); - } + if (ctx->log_encoder->output_length > 0) { + flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + } + else { + flb_plg_error(ctx->ins, "log event encoding error : %d", ret); + } - if (ctx->log_encoder->output_length > 0) { - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder->output_buffer, - ctx->log_encoder->output_length); + flb_log_event_encoder_reset(ctx->log_encoder); +#ifdef FLB_HAVE_METRICS } - else { - flb_plg_error(ctx->ins, "log event encoding error : %d", ret); - } - - flb_log_event_encoder_reset(ctx->log_encoder); +#endif return 0; } @@ -365,8 +393,13 @@ static int cb_statsd_exit(void *data, struct flb_config *config) } static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_BOOL, "metrics", "off", + 0, FLB_TRUE, offsetof(struct flb_statsd, metrics), + "Ingest as metrics type of events." + }, /* EOF */ - {0} + {0} }; /* Plugin reference */