-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle New Topic Creation #174
Conversation
if time.time() > start_time + timeout: | ||
raise KafkaTimeoutError("Unable to create topic {}".format(topic)) | ||
self.load_metadata_for_topics(topic) | ||
time.sleep(.5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sleep time should probably also be a param that users can override
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great idea. I'll get back to this based on the style review in the other conversation as soon as the world isn't literally blowing up at work.
@wizzat: are you ok with just merging this as is? The timing parameter is not critical and would be good to clean this up. If you've got a revision in the works I'll wait for it. Otherwise I feel like we should merge now. |
There was some discussion about making this automatically happen as it does with the Java client. It turns out that you can tell whether the topic is created and whether or not autocreate is on based on the error returned from the Kafka server when you emit the produce request. The problem is that making a produce request will require some fairly significant restructuring of some internals. I started that work, but it's not anywhere near complete. |
self.load_metadata_for_topics(topic) | ||
while not self.has_metadata_for_topic(topic): | ||
if time.time() > start_time + timeout: | ||
raise KafkaTimeoutError("Unable to create topic {}".format(topic)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is KafkaTimeoutError defined?
Adds ensure_topic_exists to KafkaClient, redirects test case to use
that. Fixes #113 and fixes #150.