Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

confluent_kafka #1647

Closed
VDneprovskij opened this issue Jan 27, 2023 · 8 comments
Closed

confluent_kafka #1647

VDneprovskij opened this issue Jan 27, 2023 · 8 comments

Comments

@VDneprovskij
Copy link

how to use confluent_kafka? I didn't see the solution in the examples or tests.
Now i use: with Connection('confluentkafka://localhost:9092') as conn - its worked. But how i can create Consumer and pass parametrs (group.id, sasl.mechanism and others).
Now I'm trying to follow the analogy, but it doesn't work.

from kombu import Connection, Exchange, Queue
from kombu.transport.confluentkafka import Channel

ex = Exchange('test', type='topic')
queue = Queue('test', exchange=ex, routing_key='t.*')
cfg = {
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    # 'value.deserializer': lambda x, y: x.decode('utf-8'),
    'sasl.mechanism': "SASL_PLAINTEXT",
    'sasl.username': None,
    'sasl.password': None
}
def process_media(body, message):
    print(body)
    message.ack()

with Connection('confluentkafka://localhost:9092') as conn:
  ch = Channel(conn.connection, kafka_consumer_config=cfg)
  with conn.Consumer(queue, callbacks=[process_media], channel=ch) as consumer:
      # Process messages and handle events on all channels
      while True:
          conn.drain_events()
@open-collective-bot
Copy link

Hey @VDneprovskij 👋,
Thank you for opening an issue. We will get back to you as soon as we can.
Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors.
If you require immediate assistance please consider sponsoring us.

@VDneprovskij
Copy link
Author

VDneprovskij commented Jan 31, 2023

in this case:

cfg = {
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    # 'value.deserializer': lambda x, y: x.decode('utf-8'),
    'sasl.mechanism': "SASL_PLAINTEXT",
    'sasl.username': None,
    'sasl.password': None
}
def process_media(body, message):
    print(body)
    message.ack()
with Connection('confluentkafka://localhost:9092', transport_options=cfg) as conn:
  with conn.Consumer(Queue('kontext-events_dict', channel=conn.channel()), conn.channel(),
                     callbacks=[process_media],
                     accept=['json', 'pickle', 'msgpack', 'yaml', 'application/x-python-serialize']) as consumer:
      print(consumer.channel.client, consumer.channel.common_config)
      while True:
          print(conn.info())
          conn.drain_events()

connecting and receiving the first message works, but then an error occurs, becous payload has no different section such as properties, content-encoding, headers and others:
image
image

test message in kafka: {'first': '1', 'second': '2'}

@thuibr
Copy link

thuibr commented Apr 1, 2024

Hi @VDneprovskij if you're still using Kafka with Kombu, I am adding documentation in celery/celery#8935. Would you be willing to a.) help fill in the blanks in terms of how to authenticate using SASL, and b.) help verify the docs? Thanks!

@thuibr
Copy link

thuibr commented Apr 1, 2024

@VDneprovskij upon closer inspection of the options in the transport file, I think maybe the username and password needs to be in the broker string? I'll update if I can get that to work.

@thuibr
Copy link

thuibr commented Apr 1, 2024

It actually looks like it needs to be in the kafka_admin_config:

kafka_admin_config = {
    "sasl.username": sasl_username,
    "sasl.password": sasl_password,
}

@thuibr
Copy link

thuibr commented Apr 1, 2024

This is working for me, but it might be able to be trimmed down:

import os

task_serializer = 'json'
broker_transport_options = {
    # "allow_create_topics": True,
}
broker_connection_retry_on_startup = True

# For using SQLAlchemy as the backend
# result_backend = 'db+postgresql://postgres:example@localhost/postgres'

broker_transport_options.update({
    "security_protocol": "SASL_SSL",
    "sasl_mechanism": "SCRAM-SHA-512",
})
sasl_username = os.environ["SASL_USERNAME"]
sasl_password = os.environ["SASL_PASSWORD"]
broker_url = f"confluentkafka://{sasl_username}:{sasl_password}@broker:9094"
kafka_admin_config = {
    "sasl.username": sasl_username,
    "sasl.password": sasl_password,
}
kafka_common_config = {
    "sasl.username": sasl_username,
    "sasl.password": sasl_password,
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "SCRAM-SHA-512",
    "bootstrap_servers": "broker:9094",
}

@thuibr
Copy link

thuibr commented Jul 28, 2024

Can this issue be closed?

@VDneprovskij
Copy link
Author

Yes, unfortunately I needed a solution a year ago, at that time I simply used another library to solve my problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants