diff --git a/plugins/out_kinesis_streams/kinesis_api.c b/plugins/out_kinesis_streams/kinesis_api.c index b7e29030383..afd3862d4ff 100644 --- a/plugins/out_kinesis_streams/kinesis_api.c +++ b/plugins/out_kinesis_streams/kinesis_api.c @@ -225,6 +225,7 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf, struct tm *tmp; size_t len; size_t tmp_size; + char *out_buf; tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; ret = flb_msgpack_to_json(tmp_buf_ptr, @@ -276,14 +277,27 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf, ctx->stream_name); return 2; } - /* guess space needed to write time_key */ - len = 6 + strlen(ctx->time_key) + 6 * strlen(ctx->time_key_format); + + /* format time output and return the length */ + len = flb_aws_strftime_precision(&out_buf, ctx->time_key_format, tms); + if (len == -1) { + /* + * when function flb_aws_strftime_precision's return value is -1, + * time_key will not be added to record. + */ + flb_plg_error(ctx->ins, "Failed to add time_key %s to record, %s", + ctx->time_key, ctx->stream_name); + return 0; + } + /* how much space do we have left */ tmp_size = (buf->tmp_buf_size - buf->tmp_buf_offset) - written; if (len > tmp_size) { - /* not enough space- tell caller to retry */ + /* not enough space - tell caller to retry */ + flb_free(out_buf); return 1; } + time_key_ptr = tmp_buf_ptr + written - 1; memcpy(time_key_ptr, ",", 1); time_key_ptr++; @@ -295,11 +309,10 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf, time_key_ptr += 3; tmp_size = buf->tmp_buf_size - buf->tmp_buf_offset; tmp_size -= (time_key_ptr - tmp_buf_ptr); - len = strftime(time_key_ptr, tmp_size, ctx->time_key_format, &time_stamp); - if (len <= 0) { - /* ran out of space - should not happen because of check above */ - return 1; - } + + /* merge out_buf to time_key_ptr */ + memcpy(time_key_ptr, out_buf, len); + flb_free(out_buf); time_key_ptr += len; memcpy(time_key_ptr, "\"}", 2); time_key_ptr += 2; @@ -621,6 +634,11 @@ int process_and_send_to_kinesis(struct flb_kinesis *ctx, struct flush *buf, } msgpack_unpacked_destroy(&result); + /* check whether append time_key to end of json string */ + if (buf->events->timestamp.tv_nsec == NULL) { + return -1; + } + /* send any remaining events */ ret = send_log_events(ctx, buf); reset_flush_buf(ctx, buf);