Skip to content

Commit

Permalink
Add support for deleting records to admin client
Browse files Browse the repository at this point in the history
Fix #385.
  • Loading branch information
benesch committed Aug 31, 2024
1 parent 964d5b8 commit ff9f6f4
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 1 deletion.
3 changes: 3 additions & 0 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t;
/// Native rdkafka new partitions object.
pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;

/// Native rdkafka delete records object.
pub type RDKafkaDeleteRecords = bindings::rd_kafka_DeleteRecords_t;

/// Native rdkafka config resource.
pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t;

Expand Down
85 changes: 85 additions & 0 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{trace, warn};
use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
use crate::TopicPartitionList;

//
// ********** ADMIN CLIENT **********
Expand Down Expand Up @@ -218,6 +219,53 @@ impl<C: ClientContext> AdminClient<C> {
Ok(rx)
}

/// Deletes records from a topic.
///
/// The provided `offsets` is a topic partition list specifying which
/// records to delete from a list of topic partitions. For each entry in the
/// list, the messages at offsets before the specified offsets (exclusive)
/// in the specified partition will be deleted. Use offset [`Offset::End`]
/// to delete all records in the partition.
///
/// Returns a topic partition list describing the result of the deletion. If
/// the operation succeeded for a partition, the offset for that partition
/// will be set to the post-deletion low-water mark for that partition. If
/// the operation failed for a partition, there will be an error for that
/// partition's entry in the list.
pub fn delete_records(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<TopicPartitionList>> {
match self.delete_records_inner(offsets, opts) {
Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}

fn delete_records_inner(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
let mut err_buf = ErrBuf::new();
let delete_records = unsafe {
NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr()))
}
.ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_DeleteRecords(
self.client.native_ptr(),
&mut delete_records.ptr(),
1,
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}

/// Retrieves the configuration parameters for the specified resources.
///
/// Note that while the API supports describing multiple configurations at
Expand Down Expand Up @@ -950,6 +998,43 @@ impl Future for CreatePartitionsFuture {
}
}

//
// Delete records handling
//

type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;

unsafe impl KafkaDrop for RDKafkaDeleteRecords {
const TYPE: &'static str = "delete records";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
}

struct DeleteRecordsFuture {
rx: oneshot::Receiver<NativeEvent>,
}

impl Future for DeleteRecordsFuture {
type Output = KafkaResult<TopicPartitionList>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"delete records request received response of incorrect type ({})",
typ
))));
}
let tpl = unsafe {
let tpl = rdsys::rd_kafka_DeleteRecords_result_offsets(res);
TopicPartitionList::from_ptr(rdsys::rd_kafka_topic_partition_list_copy(tpl))
};
Poll::Ready(Ok(tpl))
}
}

//
// Describe configs handling
//
Expand Down
111 changes: 110 additions & 1 deletion tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use rdkafka::client::DefaultClientContext;
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::metadata::Metadata;
use rdkafka::{ClientConfig, TopicPartitionList};
use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
use rdkafka::{ClientConfig, Offset, TopicPartitionList};

use crate::utils::*;

Expand Down Expand Up @@ -356,6 +357,114 @@ async fn test_topics() {
}
}

/// Test the admin client's delete records functionality.
#[tokio::test]
async fn test_delete_records() {
let producer = create_config().create::<FutureProducer<_>>().unwrap();
let admin_client = create_admin_client();
let timeout = Some(Duration::from_secs(1));
let opts = AdminOptions::new().operation_timeout(timeout);
let topic = rand_test_topic("test_delete_records");
let make_record = || FutureRecord::<str, str>::to(&topic).payload("data");

// Create a topic with a single partition.
admin_client
.create_topics(
&[NewTopic::new(&topic, 1, TopicReplication::Fixed(1))],
&opts,
)
.await
.expect("topic creation failed");

// Ensure that the topic begins with low and high water marks of 0.
let (lo, hi) = producer
.client()
.fetch_watermarks(&topic, 0, timeout)
.unwrap();
assert_eq!(lo, 0);
assert_eq!(hi, 0);

// Produce five messages to the topic.
for _ in 0..5 {
producer.send(make_record(), timeout).await.unwrap();
}

// Ensure that the high water mark has advanced to 5.
let (lo, hi) = producer
.client()
.fetch_watermarks(&topic, 0, timeout)
.unwrap();
assert_eq!(lo, 0);
assert_eq!(hi, 5);

// Delete the record at offset 0.
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(&topic, 0, Offset::Offset(1))
.unwrap();
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
assert_eq!(res_tpl.count(), 1);
assert_eq!(res_tpl.elements()[0].topic(), topic);
assert_eq!(res_tpl.elements()[0].partition(), 0);
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(1));
assert_eq!(res_tpl.elements()[0].error(), Ok(()));

// Ensure that the low water mark has advanced to 1.
let (lo, hi) = producer
.client()
.fetch_watermarks(&topic, 0, timeout)
.unwrap();
assert_eq!(lo, 1);
assert_eq!(hi, 5);

// Delete the record at offset 1 and also include an invalid partition in
// the request. The invalid partition should not cause the request to fail,
// but we should be able to see the per-partition error in the returned
// topic partition list.
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(&topic, 0, Offset::Offset(2))
.unwrap();
tpl.add_partition_offset(&topic, 1, Offset::Offset(1))
.unwrap();
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
assert_eq!(res_tpl.count(), 2);
assert_eq!(res_tpl.elements()[0].topic(), topic);
assert_eq!(res_tpl.elements()[0].partition(), 0);
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(2));
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
assert_eq!(res_tpl.elements()[1].topic(), topic);
assert_eq!(res_tpl.elements()[1].partition(), 1);
assert_eq!(
res_tpl.elements()[1].error(),
Err(KafkaError::OffsetFetch(RDKafkaErrorCode::UnknownPartition))
);

// Ensure that the low water mark has advanced to 2.
let (lo, hi) = producer
.client()
.fetch_watermarks(&topic, 0, timeout)
.unwrap();
assert_eq!(lo, 2);
assert_eq!(hi, 5);

// Delete all records up to offset 5.
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(&topic, 0, Offset::End).unwrap();
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
assert_eq!(res_tpl.count(), 1);
assert_eq!(res_tpl.elements()[0].topic(), topic);
assert_eq!(res_tpl.elements()[0].partition(), 0);
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(5));
assert_eq!(res_tpl.elements()[0].error(), Ok(()));

// Ensure that the low water mark has advanced to 5.
let (lo, hi) = producer
.client()
.fetch_watermarks(&topic, 0, timeout)
.unwrap();
assert_eq!(lo, 5);
assert_eq!(hi, 5);
}

#[tokio::test]
async fn test_configs() {
let admin_client = create_admin_client();
Expand Down

0 comments on commit ff9f6f4

Please sign in to comment.