Skip to content

Commit

Permalink
Add a few more debug statements to see that everything is working fin…
Browse files Browse the repository at this point in the history
…e in the logs.
  • Loading branch information
surister committed Apr 10, 2024
1 parent b875c7e commit 77eb9ed
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
2 changes: 2 additions & 0 deletions application/apache-kafka-flink-streaming/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
services:
weather_producer:
restart: unless-stopped
env_file:
- .env
build:
Expand All @@ -13,6 +14,7 @@ services:
flink_job:
env_file:
- .env
restart: unless-stopped
build:
context: .
dockerfile: flink_job.Dockerfile
Expand Down
5 changes: 4 additions & 1 deletion application/apache-kafka-flink-streaming/flink_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

KAFKA_BOOTSTRAP_SERVER = os.getenv('FLINK_CONSUMER_BOOTSTRAP_SERVER')
KAFKA_TOPIC = os.getenv('FLINK_CONSUMER_KAFKA_TOPIC')
CRATEDB_PG_URI = os.getenv('FLINK_CONSUMER_CRATEDB_PG_URI', 'jdbc:postgresql://localhost:5432/crate')
CRATEDB_PG_URI = os.getenv('FLINK_CONSUMER_CRATEDB_PG_URI',
'jdbc:postgresql://localhost:5432/crate')
CRATEDB_USER = os.getenv('FLINK_CONSUMER_CRATEDB_USER')
CRATEDB_PASSWORD = os.getenv('FLINK_CONSUMER_CRATEDB_PASSWORD')

logging.debug(f'Starting Flink consumer for Kafka topic: {KAFKA_TOPIC} in {KAFKA_BOOTSTRAP_SERVER}')


def kafka_to_cratedb(env: StreamExecutionEnvironment):
row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()])
Expand Down
3 changes: 3 additions & 0 deletions application/apache-kafka-flink-streaming/weather_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
BOOTSTRAP_SERVER = os.getenv('WEATHER_PRODUCER_KAFKA_BOOTSTRAP_SERVER')
KAFKA_TOPIC = os.getenv('WEATHER_PRODUCER_KAFKA_TOPIC')


producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER,
value_serializer=lambda m: json.dumps(m).encode('utf-8'))

Expand All @@ -36,12 +37,14 @@ def mocked_fetch_weather_data():


def fetch_weather_data(api_uri) -> dict:
logger.debug('Fetching weather data')
response = requests.get(api_uri)
response.raise_for_status()
return response.json()


def send_to_kafka(topic: str, producer) -> None:
logger.debug('Sending weather data to Kafka')
data = fetch_weather_data(WEATHER_URI)
producer.send(topic, value=data)
producer.flush()
Expand Down

0 comments on commit 77eb9ed

Please sign in to comment.