-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_producer.py
61 lines (41 loc) · 1.43 KB
/
kafka_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from confluent_kafka import Producer, KafkaError
import os
from dotenv import load_dotenv
# load the environment variables from .env file
load_dotenv()
# Define the configuration
conf = {
'bootstrap.servers': os.getenv('BOOTSTRAP.SERVERS'),
'security.protocol': os.getenv('SECURITY.PROTOCOL'),
'sasl.mechanisms': os.getenv('SASL.MECHANISMS'),
'ssl.ca.location': None,
'sasl.username': os.getenv('SASL.USERNAME'),
'sasl.password': os.getenv('SASL.PASSWORD'),
'session.timeout.ms': int(os.getenv('SESSION.TIMEOUT.MS')),
'group.id': os.getenv('GROUP.ID'),
'enable.auto.commit': bool(os.getenv('ENABLE.AUTO.COMMIT'))
}
print(conf)
# Create the producer
producer = Producer(conf)
# Function to send message to the topic
def send_message(topic, key, value):
try:
# Send the message
producer.produce(topic, key=key, value=value)
producer.flush()
print("Sent message: topic:{}, key:{}, value:{}".format(topic, key, value))
except KafkaError as e:
print("Error: {}".format(e))
try:
# Continuously poll for new messages
while True:
message_raw = input()
message = (message_raw.encode('utf-8'))
# Send a message
try:
send_message("demo", "mulesoft", message)
except KafkaError as e:
print("Error: {}".format(e))
except Exception as e:
print("Error: {}".format(e))