Skip to content

Commit

Permalink
in_statsd: Implement statsd decoder to translate metrics type of events
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 authored and edsiper committed Aug 14, 2024
1 parent b58022a commit fed5553
Showing 1 changed file with 54 additions and 21 deletions.
75 changes: 54 additions & 21 deletions plugins/in_statsd/statsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct flb_statsd {
char *buf; /* buffer */
char listen[256]; /* listening address (RFC-2181) */
char port[6]; /* listening port (RFC-793) */
int metrics; /* Import as metrics */
flb_sockfd_t server_fd; /* server socket */
flb_pipefd_t coll_fd; /* server handler */
struct flb_input_instance *ins; /* input instance */
Expand Down Expand Up @@ -209,6 +210,10 @@ static int cb_statsd_receive(struct flb_input_instance *ins,
char *line;
int len;
struct flb_statsd *ctx = data;
#ifdef FLB_HAVE_METRICS
struct cmt *cmt = NULL;
int cmt_flags = 0;
#endif

/* Receive a UDP datagram */
len = recv(ctx->server_fd, ctx->buf, MAX_PACKET_SIZE - 1, 0);
Expand All @@ -218,33 +223,56 @@ static int cb_statsd_receive(struct flb_input_instance *ins,
}
ctx->buf[len] = '\0';

ret = FLB_EVENT_ENCODER_SUCCESS;
/* Process all messages in buffer */
line = strtok(ctx->buf, "\n");
while (line != NULL) {
flb_plg_trace(ctx->ins, "received a line: '%s'", line);
#ifdef FLB_HAVE_METRICS
if (ctx->metrics == FLB_TRUE) {
cmt_flags |= CMT_DECODE_STATSD_GAUGE_OBSERVER;
flb_plg_trace(ctx->ins, "received a buf: '%s'", ctx->buf);
ret = cmt_decode_statsd_create(&cmt, ctx->buf, len, cmt_flags);
if (ret != CMT_DECODE_STATSD_SUCCESS) {
flb_plg_error(ctx->ins, "failed to process buf: '%s'", ctx->buf);
return -1;
}

ret = statsd_process_line(ctx, line);
/* Append the updated metrics */
ret = flb_input_metrics_append(ins, NULL, 0, cmt);
if (ret != 0) {
flb_plg_error(ins, "could not append metrics");
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins, "failed to process line: '%s'", line);
cmt_destroy(cmt);
}
else {
#endif
ret = FLB_EVENT_ENCODER_SUCCESS;
/* Process all messages in buffer */
line = strtok(ctx->buf, "\n");
while (line != NULL) {
flb_plg_trace(ctx->ins, "received a line: '%s'", line);

break;
ret = statsd_process_line(ctx, line);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins, "failed to process line: '%s'", line);

break;
}

line = strtok(NULL, "\n");
}

line = strtok(NULL, "\n");
}
if (ctx->log_encoder->output_length > 0) {
flb_input_log_append(ctx->ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
}
else {
flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
}

if (ctx->log_encoder->output_length > 0) {
flb_input_log_append(ctx->ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
flb_log_event_encoder_reset(ctx->log_encoder);
#ifdef FLB_HAVE_METRICS
}
else {
flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
}

flb_log_event_encoder_reset(ctx->log_encoder);
#endif

return 0;
}
Expand Down Expand Up @@ -365,8 +393,13 @@ static int cb_statsd_exit(void *data, struct flb_config *config)
}

static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_BOOL, "metrics", "off",
0, FLB_TRUE, offsetof(struct flb_statsd, metrics),
"Ingest as metrics type of events."
},
/* EOF */
{0}
{0}
};

/* Plugin reference */
Expand Down

0 comments on commit fed5553

Please sign in to comment.