Skip to content

Commit

Permalink
Merge pull request #6 from salute-developers/fix/kafka_main_loop_modi…
Browse files Browse the repository at this point in the history
…fieble_config

fix modification of config by kafka utils in kafka main loop
  • Loading branch information
dangerink authored Apr 26, 2022
2 parents adf76d7 + e123ef1 commit 89ca92e
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions smart_kit/start_points/main_loop_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from core.model.heapq.heapq_storage import HeapqKV
from core.mq.kafka.kafka_consumer import KafkaConsumer
from core.mq.kafka.kafka_publisher import KafkaPublisher
from core.utils.pickle_copy import pickle_deepcopy
from core.utils.stats_timer import StatsTimer
from core.basic_models.actions.command import Command
from smart_kit.compatibility.commands import combine_commands
Expand Down Expand Up @@ -54,11 +55,12 @@ def __init__(self, *args, **kwargs):
"%(class_name)s START CONSUMERS/PUBLISHERS CREATE",
params={"class_name": self.__class__.__name__}, level="WARNING"
)
for key, config in kafka_config.items():
kafka_config_copy = pickle_deepcopy(kafka_config)
for key, config in kafka_config_copy.items():
if config.get("consumer"):
consumers.update({key: KafkaConsumer(kafka_config[key])})
consumers.update({key: KafkaConsumer(config)})
if config.get("publisher"):
publishers.update({key: KafkaPublisher(kafka_config[key])})
publishers.update({key: KafkaPublisher(config)})
log(
"%(class_name)s FINISHED CONSUMERS/PUBLISHERS CREATE",
params={"class_name": self.__class__.__name__}, level="WARNING"
Expand Down

0 comments on commit 89ca92e

Please sign in to comment.