Skip to content

Commit

Permalink
out_datadog: fix/add error handling for all flb_sds calls (fluent#5929)
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley authored Nov 16, 2022
1 parent c5d57cc commit 300206a
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 46 deletions.
66 changes: 56 additions & 10 deletions plugins/out_datadog/datadog.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ static int datadog_format(struct flb_config *config,
{
int i;
int ind;
int byte_cnt;
int byte_cnt = 64;
int remap_cnt;
int ret;
/* for msgpack global structs */
int array_size = 0;
size_t array_size = 0;
size_t off = 0;
msgpack_unpacked result;
msgpack_sbuffer mp_sbuf;
Expand All @@ -110,13 +111,23 @@ static int datadog_format(struct flb_config *config,
msgpack_object k;
msgpack_object v;
struct flb_out_datadog *ctx = plugin_context;
struct flb_event_chunk *event_chunk;

/* output buffer */
flb_sds_t out_buf;
flb_sds_t remapped_tags = NULL;

/* Count number of records */
array_size = flb_mp_count(data, bytes);
flb_sds_t tmp = NULL;

/* in normal flush callback we have the event_chunk set as flush context
* so we don't need to calculate the event len.
* But in test mode the formatter won't get the event_chunk as flush_ctx
*/
if (flush_ctx != NULL) {
event_chunk = flush_ctx;
array_size = event_chunk->total_events;
} else {
array_size = flb_mp_count(data, bytes);
}

/* Create temporary msgpack buffer */
msgpack_sbuffer_init(&mp_sbuf);
Expand Down Expand Up @@ -161,6 +172,22 @@ static int datadog_format(struct flb_config *config,

if (!remapped_tags) {
remapped_tags = flb_sds_create_size(byte_cnt);
if (!remapped_tags) {
flb_errno();
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
return -1;
}
} else if (flb_sds_len(remapped_tags) < byte_cnt) {
tmp = flb_sds_increase(remapped_tags, flb_sds_len(remapped_tags) - byte_cnt);
if (!tmp) {
flb_errno();
flb_sds_destroy(remapped_tags);
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
return -1;
}
remapped_tags = tmp;
}

/*
Expand Down Expand Up @@ -227,8 +254,11 @@ static int datadog_format(struct flb_config *config,
* (so they won't be packed as attr)
*/
if (ctx->remap && (ind = dd_attr_need_remapping(k, v)) >=0 ) {
remapping[ind].remap_to_tag(remapping[ind].remap_tag_name, v,
remapped_tags);
ret = remapping[ind].remap_to_tag(remapping[ind].remap_tag_name, v,
&remapped_tags);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to remap tag: %s, skipping", remapping[ind].remap_tag_name);
}
continue;
}

Expand All @@ -250,9 +280,25 @@ static int datadog_format(struct flb_config *config,
/* here we concatenate ctx->dd_tags and remapped_tags, depending on their presence */
if (remap_cnt) {
if (ctx->dd_tags != NULL) {
flb_sds_cat(remapped_tags, FLB_DATADOG_TAG_SEPERATOR,
strlen(FLB_DATADOG_TAG_SEPERATOR));
tmp = flb_sds_cat(remapped_tags, FLB_DATADOG_TAG_SEPERATOR,
strlen(FLB_DATADOG_TAG_SEPERATOR));
if (!tmp) {
flb_errno();
flb_sds_destroy(remapped_tags);
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
return -1;
}
remapped_tags = tmp;
flb_sds_cat(remapped_tags, ctx->dd_tags, strlen(ctx->dd_tags));
if (!tmp) {
flb_errno();
flb_sds_destroy(remapped_tags);
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
return -1;
}
remapped_tags = tmp;
}
dd_msgpack_pack_key_value_str(&mp_pck,
FLB_DATADOG_DD_TAGS_KEY,
Expand Down Expand Up @@ -319,7 +365,7 @@ static void cb_datadog_flush(struct flb_event_chunk *event_chunk,

/* Convert input data into a Datadog JSON payload */
ret = datadog_format(config, i_ins,
ctx, NULL,
ctx, event_chunk,
event_chunk->tag, flb_sds_len(event_chunk->tag),
event_chunk->data, event_chunk->size,
&out_buf, &out_size);
Expand Down
21 changes: 17 additions & 4 deletions plugins/out_datadog/datadog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct flb_out_datadog *flb_datadog_conf_create(struct flb_output_instance *ins,
struct flb_upstream *upstream;
const char *api_key;
const char *tmp;
flb_sds_t tmp_sds;

int ret;
char *protocol = NULL;
Expand Down Expand Up @@ -75,12 +76,18 @@ struct flb_out_datadog *flb_datadog_conf_create(struct flb_output_instance *ins,
/* use TLS ? */
if (ins->use_tls == FLB_TRUE) {
io_flags = FLB_IO_TLS;
ctx->scheme = flb_sds_create("https://");
tmp_sds = flb_sds_create("https://");
}
else {
io_flags = FLB_IO_TCP;
ctx->scheme = flb_sds_create("http://");
tmp_sds = flb_sds_create("http://");
}
if (!tmp_sds) {
flb_errno();
flb_datadog_conf_destroy(ctx);
return NULL;
}
ctx->scheme = tmp_sds;
flb_plg_debug(ctx->ins, "scheme: %s", ctx->scheme);

/* configure URI */
Expand Down Expand Up @@ -126,11 +133,17 @@ struct flb_out_datadog *flb_datadog_conf_create(struct flb_output_instance *ins,

/* Get network configuration */
if (!ins->host.name) {
ctx->host = flb_sds_create(FLB_DATADOG_DEFAULT_HOST);
tmp_sds = flb_sds_create(FLB_DATADOG_DEFAULT_HOST);
}
else {
ctx->host = flb_sds_create(ins->host.name);
tmp_sds = flb_sds_create(ins->host.name);
}
if (!tmp_sds) {
flb_errno();
flb_datadog_conf_destroy(ctx);
return NULL;
}
ctx->host = tmp_sds;
flb_plg_debug(ctx->ins, "host: %s", ctx->host);

if (ins->host.port != 0) {
Expand Down
Loading

0 comments on commit 300206a

Please sign in to comment.