diff --git a/include/fluent-bit/flb_hash.h b/include/fluent-bit/flb_hash.h index 29b9f7b8ef5..42666c75433 100644 --- a/include/fluent-bit/flb_hash.h +++ b/include/fluent-bit/flb_hash.h @@ -55,6 +55,7 @@ struct flb_hash { int max_entries; int total_count; int cache_ttl; + int force_remove_pointer; size_t size; struct mk_list entries; struct flb_hash_table *table; @@ -63,6 +64,8 @@ struct flb_hash { struct flb_hash *flb_hash_create(int evict_mode, size_t size, int max_entries); struct flb_hash *flb_hash_create_with_ttl(int cache_ttl, int evict_mode, size_t size, int max_entries); +struct flb_hash *flb_hash_create_with_ttl_force_destroy(int cache_ttl, int evict_mode, + size_t size, int max_entries); void flb_hash_destroy(struct flb_hash *ht); int flb_hash_add(struct flb_hash *ht, diff --git a/plugins/filter_kubernetes/kube_conf.c b/plugins/filter_kubernetes/kube_conf.c index 20de8c3b215..c190085a84f 100644 --- a/plugins/filter_kubernetes/kube_conf.c +++ b/plugins/filter_kubernetes/kube_conf.c @@ -192,7 +192,7 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins, } - ctx->pod_hash_table = flb_hash_create_with_ttl(ctx->pod_service_map_ttl, + ctx->pod_hash_table = flb_hash_create_with_ttl_force_destroy(ctx->pod_service_map_ttl, FLB_HASH_EVICT_OLDER, FLB_HASH_TABLE_SIZE, FLB_HASH_TABLE_SIZE); @@ -209,6 +209,10 @@ void flb_kube_conf_destroy(struct flb_kube *ctx) flb_hash_destroy(ctx->hash_table); } + if (ctx->pod_hash_table) { + flb_hash_destroy(ctx->pod_hash_table); + } + if (ctx->merge_log == FLB_TRUE) { flb_free(ctx->unesc_buf); } diff --git a/plugins/filter_kubernetes/kube_conf.h b/plugins/filter_kubernetes/kube_conf.h index 24f25e9a6ab..f338599049c 100644 --- a/plugins/filter_kubernetes/kube_conf.h +++ b/plugins/filter_kubernetes/kube_conf.h @@ -178,7 +178,7 @@ struct flb_kube { char *use_pod_association; char *pod_association_host; char *pod_association_endpoint; - char *pod_association_port; + int pod_association_port; /* * TTL is used to check how long should the mapped entry diff --git a/plugins/filter_kubernetes/kube_meta.c b/plugins/filter_kubernetes/kube_meta.c index d2077236b4c..85c69263ead 100644 --- a/plugins/filter_kubernetes/kube_meta.c +++ b/plugins/filter_kubernetes/kube_meta.c @@ -1717,5 +1717,9 @@ int flb_kube_meta_release(struct flb_kube_meta *meta) flb_free(meta->cache_key); } + if (meta->cluster) { + flb_free(meta->cluster); + } + return r; } diff --git a/plugins/filter_kubernetes/kubernetes.c b/plugins/filter_kubernetes/kubernetes.c index 30f69a0c498..29e783eab57 100644 --- a/plugins/filter_kubernetes/kubernetes.c +++ b/plugins/filter_kubernetes/kubernetes.c @@ -48,6 +48,7 @@ struct task_args { pthread_mutex_t metadata_mutex; pthread_t background_thread; struct task_args *task_args = {0}; +struct mk_event_loop *evl; /* * If a file exists called service.map, load it and use it. @@ -132,7 +133,7 @@ static void parse_pod_service_map(struct flb_kube *ctx, char *api_buf, size_t ap v = api_map.via.map.ptr[i].val; if (k.type == MSGPACK_OBJECT_STR && v.type == MSGPACK_OBJECT_MAP) { char *pod_name = flb_strndup(k.via.str.ptr, k.via.str.size); - struct service_attributes *service_attributes = malloc(sizeof(struct service_attributes)); + struct service_attributes *service_attributes = flb_malloc(sizeof(struct service_attributes)); for (int j = 0; j < v.via.map.size; j++) { attributeKey = v.via.map.ptr[j].key; attributeValue = v.via.map.ptr[j].val; @@ -157,7 +158,7 @@ static void parse_pod_service_map(struct flb_kube *ctx, char *api_buf, size_t ap pthread_mutex_lock(&metadata_mutex); flb_hash_add(ctx->pod_hash_table, pod_name,k.via.str.size, - service_attributes, sizeof(struct service_attributes)); + service_attributes, 0); pthread_mutex_unlock(&metadata_mutex); } else { flb_free(service_attributes); @@ -192,6 +193,7 @@ static int fetch_pod_service_map(struct flb_kube *ctx, char *api_server_url) { ret = get_pod_service_file_info(ctx, "use_pod_association_enabled", &buffer); if (ret > 0 && buffer != NULL) { parse_pod_service_map(ctx, buffer, ret); + flb_free(buffer); } else { /* Get upstream context and connection */ @@ -250,7 +252,7 @@ static int fetch_pod_service_map(struct flb_kube *ctx, char *api_server_url) { void *update_pod_service_map(void *arg) { while (1) { flb_engine_evl_init(); - struct mk_event_loop *evl = mk_event_loop_create(256); + evl = mk_event_loop_create(256); flb_engine_evl_set(evl); fetch_pod_service_map(task_args->ctx,task_args->api_server_url); flb_plg_debug(task_args->ctx->ins, "Updating pod to service map after %d seconds", task_args->ctx->pod_service_map_refresh_interval); @@ -863,7 +865,9 @@ static int cb_kube_exit(void *data, struct flb_config *config) pthread_mutex_destroy(&metadata_mutex); flb_free(task_args); - + if (evl) { + mk_event_loop_destroy(evl); + } return 0; } diff --git a/src/flb_hash.c b/src/flb_hash.c index fe3035f901d..8dbcc502bb7 100644 --- a/src/flb_hash.c +++ b/src/flb_hash.c @@ -34,6 +34,8 @@ static inline void flb_hash_entry_free(struct flb_hash *ht, flb_free(entry->key); if (entry->val && entry->val_size > 0) { flb_free(entry->val); + } else if (ht->force_remove_pointer) { + flb_free(entry->val); } flb_free(entry); } @@ -60,6 +62,7 @@ struct flb_hash *flb_hash_create(int evict_mode, size_t size, int max_entries) ht->size = size; ht->total_count = 0; ht->cache_ttl = 0; + ht->force_remove_pointer = 0; ht->table = flb_calloc(1, sizeof(struct flb_hash_table) * size); if (!ht->table) { flb_errno(); @@ -93,6 +96,21 @@ struct flb_hash *flb_hash_create_with_ttl(int cache_ttl, int evict_mode, return ht; } +struct flb_hash *flb_hash_create_with_ttl_force_destroy(int cache_ttl, int evict_mode, + size_t size, int max_entries) +{ + struct flb_hash *ht; + + ht = flb_hash_create_with_ttl(cache_ttl,evict_mode, size, max_entries); + if (!ht) { + flb_errno(); + return NULL; + } + + ht->force_remove_pointer = 1; + return ht; +} + int flb_hash_del_ptr(struct flb_hash *ht, const char *key, int key_len, void *ptr) { diff --git a/tests/runtime/data/kubernetes/servicemap/use_pod_association_enabled.map b/tests/runtime/data/kubernetes/servicemap/use_pod_association_enabled.map index 08927cc4817..ee2207d0dc2 100644 --- a/tests/runtime/data/kubernetes/servicemap/use_pod_association_enabled.map +++ b/tests/runtime/data/kubernetes/servicemap/use_pod_association_enabled.map @@ -1 +1,6 @@ -{"use-pod-association-enabled":{"ServiceName":"test-service","Environment":"test-environment"}} \ No newline at end of file +{ + "use-pod-association-enabled": { + "ServiceName": "test-service", + "Environment": "test-environment" + } +} diff --git a/tests/runtime/filter_kubernetes.c b/tests/runtime/filter_kubernetes.c index 1c9d8bd2337..ef4f1e5f693 100644 --- a/tests/runtime/filter_kubernetes.c +++ b/tests/runtime/filter_kubernetes.c @@ -54,9 +54,6 @@ static void clear_file(const char *filename) { // Helper function to write to the file with the specified content static void write_log_to_file(const char *filename) { FILE *file; - time_t rawtime; - struct tm *timeinfo; - char time_str[100]; char log_entry[512]; // Log message to write