diff --git a/application/apache-kafka-flink-streaming/docker-compose.yml b/application/apache-kafka-flink-streaming/docker-compose.yml index 9492e038..f9fc0800 100644 --- a/application/apache-kafka-flink-streaming/docker-compose.yml +++ b/application/apache-kafka-flink-streaming/docker-compose.yml @@ -1,5 +1,6 @@ services: weather_producer: + restart: unless-stopped env_file: - .env build: @@ -13,6 +14,7 @@ services: flink_job: env_file: - .env + restart: unless-stopped build: context: . dockerfile: flink_job.Dockerfile diff --git a/application/apache-kafka-flink-streaming/flink_consumer.py b/application/apache-kafka-flink-streaming/flink_consumer.py index 9406177c..71ba4280 100644 --- a/application/apache-kafka-flink-streaming/flink_consumer.py +++ b/application/apache-kafka-flink-streaming/flink_consumer.py @@ -18,10 +18,13 @@ KAFKA_BOOTSTRAP_SERVER = os.getenv('FLINK_CONSUMER_BOOTSTRAP_SERVER') KAFKA_TOPIC = os.getenv('FLINK_CONSUMER_KAFKA_TOPIC') -CRATEDB_PG_URI = os.getenv('FLINK_CONSUMER_CRATEDB_PG_URI', 'jdbc:postgresql://localhost:5432/crate') +CRATEDB_PG_URI = os.getenv('FLINK_CONSUMER_CRATEDB_PG_URI', + 'jdbc:postgresql://localhost:5432/crate') CRATEDB_USER = os.getenv('FLINK_CONSUMER_CRATEDB_USER') CRATEDB_PASSWORD = os.getenv('FLINK_CONSUMER_CRATEDB_PASSWORD') +logging.debug(f'Starting Flink consumer for Kafka topic: {KAFKA_TOPIC} in {KAFKA_BOOTSTRAP_SERVER}') + def kafka_to_cratedb(env: StreamExecutionEnvironment): row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()]) diff --git a/application/apache-kafka-flink-streaming/weather_producer.py b/application/apache-kafka-flink-streaming/weather_producer.py index 95141423..16a2b657 100644 --- a/application/apache-kafka-flink-streaming/weather_producer.py +++ b/application/apache-kafka-flink-streaming/weather_producer.py @@ -25,6 +25,7 @@ BOOTSTRAP_SERVER = os.getenv('WEATHER_PRODUCER_KAFKA_BOOTSTRAP_SERVER') KAFKA_TOPIC = os.getenv('WEATHER_PRODUCER_KAFKA_TOPIC') + producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER, value_serializer=lambda m: json.dumps(m).encode('utf-8')) @@ -36,12 +37,14 @@ def mocked_fetch_weather_data(): def fetch_weather_data(api_uri) -> dict: + logger.debug('Fetching weather data') response = requests.get(api_uri) response.raise_for_status() return response.json() def send_to_kafka(topic: str, producer) -> None: + logger.debug('Sending weather data to Kafka') data = fetch_weather_data(WEATHER_URI) producer.send(topic, value=data) producer.flush()