Skip to content

Commit

Permalink
out_kinesis_firehose: used function flb_aws_strftime_precision for ti…
Browse files Browse the repository at this point in the history
…me output.

Signed-off-by: Clay Cheng <[email protected]>
  • Loading branch information
Claych committed Dec 28, 2022
1 parent c6fe142 commit f97cafc
Showing 1 changed file with 37 additions and 22 deletions.
59 changes: 37 additions & 22 deletions plugins/out_kinesis_firehose/firehose_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf,
size_t len;
size_t tmp_size;
void *compressed_tmp_buf;
char *out_buf;

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
ret = flb_msgpack_to_json(tmp_buf_ptr,
Expand Down Expand Up @@ -216,34 +217,48 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf,
ctx->delivery_stream);
return 2;
}
/* guess space needed to write time_key */
len = 6 + strlen(ctx->time_key) + 6 * strlen(ctx->time_key_format);

/* format time output and return the length */
len = flb_aws_strftime_precision(&out_buf, ctx->time_key_format, tms);

/* how much space do we have left */
tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written;
if (len > tmp_size) {
/* not enough space- tell caller to retry */
/* not enough space - tell caller to retry */
flb_free(out_buf);
return 1;
}
time_key_ptr = tmp_buf_ptr + written - 1;
memcpy(time_key_ptr, ",", 1);
time_key_ptr++;
memcpy(time_key_ptr, "\"", 1);
time_key_ptr++;
memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key));
time_key_ptr += strlen(ctx->time_key);
memcpy(time_key_ptr, "\":\"", 3);
time_key_ptr += 3;
tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset;
tmp_size -= (time_key_ptr - tmp_buf_ptr);
len = strftime(time_key_ptr, tmp_size, ctx->time_key_format, &time_stamp);
if (len <= 0) {
/* ran out of space - should not happen because of check above */
return 1;

if (len == 0) {
/*
* when the length of out_buf is not enough for time_key_format,
* time_key will not be added to record.
*/
flb_plg_error(ctx->ins, "Failed to add time_key %s to record, %s",
ctx->time_key, ctx->delivery_stream);
flb_free(out_buf);
}
else {
time_key_ptr = tmp_buf_ptr + written - 1;
memcpy(time_key_ptr, ",", 1);
time_key_ptr++;
memcpy(time_key_ptr, "\"", 1);
time_key_ptr++;
memcpy(time_key_ptr, ctx->time_key, strlen(ctx->time_key));
time_key_ptr += strlen(ctx->time_key);
memcpy(time_key_ptr, "\":\"", 3);
time_key_ptr += 3;
tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset;
tmp_size -= (time_key_ptr - tmp_buf_ptr);

/* merge out_buf to time_key_ptr */
memcpy(time_key_ptr, out_buf, len);
flb_free(out_buf);
time_key_ptr += len;
memcpy(time_key_ptr, "\"}", 2);
time_key_ptr += 2;
written = (time_key_ptr - tmp_buf_ptr);
}
time_key_ptr += len;
memcpy(time_key_ptr, "\"}", 2);
time_key_ptr += 2;
written = (time_key_ptr - tmp_buf_ptr);
}

/* is (written + 1) because we still have to append newline */
Expand Down

0 comments on commit f97cafc

Please sign in to comment.