-
Notifications
You must be signed in to change notification settings - Fork 906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a delete records api #1710
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed Halfway. Still reviewing. Please resolve these issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more review comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have these concerns for the public API that will we frozen after we release it
src/confluent_kafka/src/Admin.c
Outdated
const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev); | ||
const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res); | ||
|
||
result = c_parts_to_py(c_delete_records_res_list); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the responses are returned in the same order as the requests, so that it's possible to use _make_futmap_result_from_list
Also we should use a DeletedRecords
object as in Java, instead of a TopicPartition
because that could be extended in future and in that case we cannot apply the same changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are converting it to the futmap in the end keyed at TopicPartition
so there is no issue in that I think.
We can create DeletedRecords
class as Java instead of directly using offset
in the response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a new class DeleteRecords containing only offset and made changes according to that. Please review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more fixes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work. LGTM!..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are my proposed changes
|
||
param list(TopicPartitions) topic_partition_offsets: A list of | ||
TopicPartition objects having `offset` field set to the offset | ||
before which all the records should be deleted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before which all the records should be deleted. | |
before which all the records should be deleted. | |
`offset` can be set to :py:const:`OFFSET_END` (-1) to delete all records | |
in the partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments about SetItem error handling, c_brokers_to_py
and error format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments about the sphinx documentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add DeletedRecords
to index.rst
after ConsumerGroupDescription
- :ref:`DeletedRecords <pythonclient_deleted_records>`
and
.. _pythonclient_deleted_records:
**************
DeletedRecords
**************
.. autoclass:: confluent_kafka.admin.DeletedRecords
:members:
in the specified Topic and Partitions. | ||
|
||
:param list(TopicPartitions) topic_partition_offsets: A list of | ||
TopicPartition objects having `offset` field set to the offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TopicPartition objects having `offset` field set to the offset | |
:class:`.TopicPartition` objects having `offset` field set to the offset |
including broker lookup, request transmission, operation time | ||
on broker, and response. Default: `socket.timeout.ms*1000.0` | ||
:param float operation_timeout: The operation timeout in seconds, | ||
controlling how long the delete_records request will block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
controlling how long the delete_records request will block | |
controlling how long the `delete_records` request will block |
in the cluster. A value of 0 returns immediately. | ||
Default: `socket.timeout.ms/1000.0` | ||
|
||
:returns: A dict of futures keyed by the TopicPartition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:returns: A dict of futures keyed by the TopicPartition. | |
:returns: A dict of futures keyed by the :class:`.TopicPartition`. |
Default: `socket.timeout.ms/1000.0` | ||
|
||
:returns: A dict of futures keyed by the TopicPartition. | ||
The future result() method returns DeletedRecords |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future result() method returns DeletedRecords | |
The future result() method returns :class:`.DeletedRecords` |
|
||
:returns: A dict of futures keyed by the TopicPartition. | ||
The future result() method returns DeletedRecords | ||
or raises KafkaException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or raises KafkaException | |
or raises :class:`.KafkaException` |
def delete_records(self, topic_partition_offsets, **kwargs): | ||
""" | ||
Deletes all the records before the specified offsets (not including), | ||
in the specified Topic and Partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Written this way they seem like classes
in the specified Topic and Partitions. | |
in the specified topics and partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Thanks @PratRanj07 and @pranavrth
Added a delete records api and created integration tests and unit tests for the same