diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 33a247451de..ee9a1c5919f 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "kafka_config.h" #include "kafka_topic.h" @@ -66,6 +67,39 @@ void cb_kafka_logger(const rd_kafka_t *rk, int level, } } +/* update mbedtls_sha256_context with a key/value object from the message */ +static int kafka_hash(struct flb_out_kafka *ctx, char* *hash_buf, int *hash_buf_size, + mbedtls_sha256_context *sha256_ctx, msgpack_object *msg_obj) +{ + int ret = 0; + int max_increase = 14; // max hash_buffer memory: hash_buf_size(512) * 2^14 = 8MB + int increase_count = 0; + + while (increase_count < max_increase) { + ret = msgpack_object_print_buffer(*hash_buf, *hash_buf_size, *msg_obj); + if (ret != 0) { + ret = mbedtls_sha256_update_ret(sha256_ctx, (const unsigned char *) *hash_buf, ret); + if (ret != 0) { + flb_plg_warn(ctx->ins, "hash: can't increase hash_buffer to %d", *hash_buf_size); + break; + } + return 1; + } + increase_count++; + flb_plg_debug(ctx->ins, "increasing hash_buf: %d * 2", *hash_buf_size); + *hash_buf_size = *hash_buf_size * 2; + *hash_buf = flb_realloc(*hash_buf, *hash_buf_size); + if (!*hash_buf) { + flb_plg_warn(ctx->ins, "hash: can't increase hash_buffer to %d", *hash_buf_size); + break; + } + } + flb_plg_warn(ctx->ins, "hash: max_increase reached - can't increase hash_buffer"); + flb_free(*hash_buf); + *hash_buf = NULL; + return 0; +} + static int cb_kafka_init(struct flb_output_instance *ins, struct flb_config *config, void *data) @@ -105,6 +139,15 @@ int produce_message(struct flb_time *tm, msgpack_object *map, msgpack_object key; msgpack_object val; flb_sds_t s; + uint8_t hash[32] = {0}; + char *hash_buf = NULL; + char hash_formatted[64] = {'\0'}; + int *hash_buf_size = &(int){512}; + int hash_hex_length; + mbedtls_sha256_context sha256_ctx; + int sha256_ret = 0; + int j; + int size_for_hash = 0; #ifdef FLB_HAVE_AVRO_ENCODER // used to flag when a buffer needs to be freed for avro @@ -141,13 +184,50 @@ int produce_message(struct flb_time *tm, msgpack_object *map, if (flb_log_check(FLB_LOG_DEBUG)) msgpack_object_print(stderr, *map); + /* increase size for the timestamp */ + size = map->via.map.size; + size++; + + if (ctx->hash) { + /* init mbedtls_sha256 and hash_buffer */ + mbedtls_sha256_init(&sha256_ctx); + sha256_ret = mbedtls_sha256_starts_ret(&sha256_ctx, 0); + if (sha256_ret != 0) { + flb_plg_warn(ctx->ins, "can't init mbedtls_sha256, disable hash generation"); + } + else { + hash_buf = flb_malloc(*hash_buf_size); + if (!hash_buf) { + flb_errno(); + flb_plg_warn(ctx->ins, "can't init hash_buffer, disable hash generation"); + mbedtls_sha256_free(&sha256_ctx); + } + /* update hash with msg timestamp */ + else { + snprintf(hash_formatted, sizeof(hash_formatted)-1, "%"PRIu64 , (uint64_t) tm->tm.tv_nsec); + ret = mbedtls_sha256_update_ret(&sha256_ctx, (const unsigned char *) hash_formatted, sizeof(hash_formatted)-1); + if (ret != 0) { + flb_plg_warn(ctx->ins, "can't update mbedtls_sha256 with timestamp, disable hash generation"); + if (hash_buf) { + flb_free(hash_buf); + hash_buf = NULL; + } + mbedtls_sha256_free(&sha256_ctx); + } + /* hash init successful, increase msgpack size for additional hash */ + else { + size++; + size_for_hash = 1; + } + } + } + } /* Init temporal buffers */ msgpack_sbuffer_init(&mp_sbuf); msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) { /* Make room for the timestamp */ - size = map->via.map.size + 1; msgpack_pack_map(&mp_pck, size); /* Pack timestamp */ @@ -181,7 +261,6 @@ int produce_message(struct flb_time *tm, msgpack_object *map, } } else { - size = map->via.map.size; msgpack_pack_map(&mp_pck, size); } @@ -192,6 +271,19 @@ int produce_message(struct flb_time *tm, msgpack_object *map, msgpack_pack_object(&mp_pck, key); msgpack_pack_object(&mp_pck, val); + if (ctx->hash && hash_buf) { + ret = kafka_hash(ctx, &hash_buf, hash_buf_size, &sha256_ctx, &val); + if (ret) { + ret = kafka_hash(ctx, &hash_buf, hash_buf_size, &sha256_ctx, &key); + } + if (!ret) { + if (hash_buf) { + flb_free(hash_buf); + } + mbedtls_sha256_free(&sha256_ctx); + } + } + /* Lookup message key */ if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) { if (key.via.str.size == ctx->message_key_field_len && @@ -259,6 +351,35 @@ int produce_message(struct flb_time *tm, msgpack_object *map, } } + if (ctx->hash) { + if (hash_buf) { + hash_hex_length = 64; + sha256_ret = mbedtls_sha256_finish_ret(&sha256_ctx, hash); + if (sha256_ret != 0) { + flb_plg_warn(ctx->ins, "hash generation error\n"); + } + mbedtls_sha256_free(&sha256_ctx); + i = 0; + for(j = 0; j < 32; j++) { + sprintf(hash_formatted+(j+i), "%02x", (int) hash[j]); + i++; + } + flb_plg_debug(ctx->ins, "generated hash: <%s>\n", hash_formatted); + flb_free(hash_buf); + } + else { + flb_plg_warn(ctx->ins, "set hash to 'failed'\n"); + hash_hex_length = 6; + strncpy(hash_formatted, "failed", hash_hex_length + 1); + } + if (size_for_hash) { + msgpack_pack_str(&mp_pck, ctx->hash_key_len); + msgpack_pack_str_body(&mp_pck, ctx->hash_key, ctx->hash_key_len); + msgpack_pack_str(&mp_pck, hash_hex_length); + msgpack_pack_str_body(&mp_pck, hash_formatted, hash_hex_length); + } + } + if (ctx->format == FLB_KAFKA_FMT_JSON) { s = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); if (!s) { @@ -569,6 +690,16 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Set the level key for gelf output." }, + { + FLB_CONFIG_MAP_BOOL, "hash", "false", + 0, FLB_TRUE, offsetof(struct flb_out_kafka, hash), + "Add sha256 hash to the message." + }, + { + FLB_CONFIG_MAP_STR, "hash_key", FLB_KAFKA_HASH_KEY, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, hash_key), + "Set the hash key for the message hash." + }, #ifdef FLB_HAVE_AVRO_ENCODER { FLB_CONFIG_MAP_STR, "schema_str", (char *)NULL, @@ -619,4 +750,4 @@ struct flb_output_plugin out_kafka_plugin = { .cb_exit = cb_kafka_exit, .config_map = config_map, .flags = 0 -}; +}; \ No newline at end of file diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index ba666c7d30d..2c680dfd7a7 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -76,6 +76,28 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, ctx->topic_key_len = strlen(ctx->topic_key); } + /* Config: Hash */ + if (ctx->hash) { + ctx->hash_key_len = strlen(ctx->hash_key); + } + if (ctx->hash_key) { + ctx->hash_key_len = strlen(ctx->hash_key); + } + else { + ctx->hash_key_len = 0; + } + + /* Config: Hash_Key */ + tmp = flb_output_get_property("hash_key", ins); + if (tmp) { + ctx->hash_key = flb_strdup(tmp); + ctx->hash_key_len = strlen(tmp); + } + else { + ctx->hash_key = FLB_KAFKA_HASH_KEY; + ctx->hash_key_len = strlen(FLB_KAFKA_HASH_KEY); + } + /* Config: Format */ if (ctx->format_str) { if (strcasecmp(ctx->format_str, "json") == 0) { @@ -232,6 +254,10 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx) flb_free(ctx->message_key_field); } + if (ctx->hash_key) { + flb_free(ctx->hash_key); + } + flb_sds_destroy(ctx->gelf_fields.timestamp_key); flb_sds_destroy(ctx->gelf_fields.host_key); flb_sds_destroy(ctx->gelf_fields.short_message_key); diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index 813c6116f90..b3173039355 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -35,6 +35,7 @@ #define FLB_KAFKA_FMT_AVRO 3 #endif #define FLB_KAFKA_TS_KEY "@timestamp" +#define FLB_KAFKA_HASH_KEY "_id" #define FLB_KAFKA_QUEUE_FULL_RETRIES "10" /* rdkafka log levels based on syslog(3) */ @@ -68,6 +69,10 @@ struct flb_out_kafka { int topic_key_len; char *topic_key; + int hash; + int hash_key_len; + char *hash_key; + int timestamp_key_len; char *timestamp_key; int timestamp_format;