From ff9f6f4c5570943e28fe54474387960740f7a007 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Fri, 30 Aug 2024 21:43:11 -0400 Subject: [PATCH] Add support for deleting records to admin client Fix #385. --- rdkafka-sys/src/types.rs | 3 ++ src/admin.rs | 85 ++++++++++++++++++++++++++++++ tests/test_admin.rs | 111 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 198 insertions(+), 1 deletion(-) diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 97b77b312..0005073ba 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -78,6 +78,9 @@ pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t; /// Native rdkafka new partitions object. pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t; +/// Native rdkafka delete records object. +pub type RDKafkaDeleteRecords = bindings::rd_kafka_DeleteRecords_t; + /// Native rdkafka config resource. pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t; diff --git a/src/admin.rs b/src/admin.rs index 69dba537b..0418f0cac 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -26,6 +26,7 @@ use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::log::{trace, warn}; use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout}; +use crate::TopicPartitionList; // // ********** ADMIN CLIENT ********** @@ -218,6 +219,53 @@ impl AdminClient { Ok(rx) } + /// Deletes records from a topic. + /// + /// The provided `offsets` is a topic partition list specifying which + /// records to delete from a list of topic partitions. For each entry in the + /// list, the messages at offsets before the specified offsets (exclusive) + /// in the specified partition will be deleted. Use offset [`Offset::End`] + /// to delete all records in the partition. + /// + /// Returns a topic partition list describing the result of the deletion. If + /// the operation succeeded for a partition, the offset for that partition + /// will be set to the post-deletion low-water mark for that partition. If + /// the operation failed for a partition, there will be an error for that + /// partition's entry in the list. + pub fn delete_records( + &self, + offsets: &TopicPartitionList, + opts: &AdminOptions, + ) -> impl Future> { + match self.delete_records_inner(offsets, opts) { + Ok(rx) => Either::Left(DeleteRecordsFuture { rx }), + Err(err) => Either::Right(future::err(err)), + } + } + + fn delete_records_inner( + &self, + offsets: &TopicPartitionList, + opts: &AdminOptions, + ) -> KafkaResult> { + let mut err_buf = ErrBuf::new(); + let delete_records = unsafe { + NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr())) + } + .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_DeleteRecords( + self.client.native_ptr(), + &mut delete_records.ptr(), + 1, + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + /// Retrieves the configuration parameters for the specified resources. /// /// Note that while the API supports describing multiple configurations at @@ -950,6 +998,43 @@ impl Future for CreatePartitionsFuture { } } +// +// Delete records handling +// + +type NativeDeleteRecords = NativePtr; + +unsafe impl KafkaDrop for RDKafkaDeleteRecords { + const TYPE: &'static str = "delete records"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy; +} + +struct DeleteRecordsFuture { + rx: oneshot::Receiver, +} + +impl Future for DeleteRecordsFuture { + type Output = KafkaResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( + "delete records request received response of incorrect type ({})", + typ + )))); + } + let tpl = unsafe { + let tpl = rdsys::rd_kafka_DeleteRecords_result_offsets(res); + TopicPartitionList::from_ptr(rdsys::rd_kafka_topic_partition_list_copy(tpl)) + }; + Poll::Ready(Ok(tpl)) + } +} + // // Describe configs handling // diff --git a/tests/test_admin.rs b/tests/test_admin.rs index 2e6034cb2..4cf0e9a81 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -12,7 +12,8 @@ use rdkafka::client::DefaultClientContext; use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext}; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::metadata::Metadata; -use rdkafka::{ClientConfig, TopicPartitionList}; +use rdkafka::producer::{FutureProducer, FutureRecord, Producer}; +use rdkafka::{ClientConfig, Offset, TopicPartitionList}; use crate::utils::*; @@ -356,6 +357,114 @@ async fn test_topics() { } } +/// Test the admin client's delete records functionality. +#[tokio::test] +async fn test_delete_records() { + let producer = create_config().create::>().unwrap(); + let admin_client = create_admin_client(); + let timeout = Some(Duration::from_secs(1)); + let opts = AdminOptions::new().operation_timeout(timeout); + let topic = rand_test_topic("test_delete_records"); + let make_record = || FutureRecord::::to(&topic).payload("data"); + + // Create a topic with a single partition. + admin_client + .create_topics( + &[NewTopic::new(&topic, 1, TopicReplication::Fixed(1))], + &opts, + ) + .await + .expect("topic creation failed"); + + // Ensure that the topic begins with low and high water marks of 0. + let (lo, hi) = producer + .client() + .fetch_watermarks(&topic, 0, timeout) + .unwrap(); + assert_eq!(lo, 0); + assert_eq!(hi, 0); + + // Produce five messages to the topic. + for _ in 0..5 { + producer.send(make_record(), timeout).await.unwrap(); + } + + // Ensure that the high water mark has advanced to 5. + let (lo, hi) = producer + .client() + .fetch_watermarks(&topic, 0, timeout) + .unwrap(); + assert_eq!(lo, 0); + assert_eq!(hi, 5); + + // Delete the record at offset 0. + let mut tpl = TopicPartitionList::new(); + tpl.add_partition_offset(&topic, 0, Offset::Offset(1)) + .unwrap(); + let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap(); + assert_eq!(res_tpl.count(), 1); + assert_eq!(res_tpl.elements()[0].topic(), topic); + assert_eq!(res_tpl.elements()[0].partition(), 0); + assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(1)); + assert_eq!(res_tpl.elements()[0].error(), Ok(())); + + // Ensure that the low water mark has advanced to 1. + let (lo, hi) = producer + .client() + .fetch_watermarks(&topic, 0, timeout) + .unwrap(); + assert_eq!(lo, 1); + assert_eq!(hi, 5); + + // Delete the record at offset 1 and also include an invalid partition in + // the request. The invalid partition should not cause the request to fail, + // but we should be able to see the per-partition error in the returned + // topic partition list. + let mut tpl = TopicPartitionList::new(); + tpl.add_partition_offset(&topic, 0, Offset::Offset(2)) + .unwrap(); + tpl.add_partition_offset(&topic, 1, Offset::Offset(1)) + .unwrap(); + let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap(); + assert_eq!(res_tpl.count(), 2); + assert_eq!(res_tpl.elements()[0].topic(), topic); + assert_eq!(res_tpl.elements()[0].partition(), 0); + assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(2)); + assert_eq!(res_tpl.elements()[0].error(), Ok(())); + assert_eq!(res_tpl.elements()[1].topic(), topic); + assert_eq!(res_tpl.elements()[1].partition(), 1); + assert_eq!( + res_tpl.elements()[1].error(), + Err(KafkaError::OffsetFetch(RDKafkaErrorCode::UnknownPartition)) + ); + + // Ensure that the low water mark has advanced to 2. + let (lo, hi) = producer + .client() + .fetch_watermarks(&topic, 0, timeout) + .unwrap(); + assert_eq!(lo, 2); + assert_eq!(hi, 5); + + // Delete all records up to offset 5. + let mut tpl = TopicPartitionList::new(); + tpl.add_partition_offset(&topic, 0, Offset::End).unwrap(); + let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap(); + assert_eq!(res_tpl.count(), 1); + assert_eq!(res_tpl.elements()[0].topic(), topic); + assert_eq!(res_tpl.elements()[0].partition(), 0); + assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(5)); + assert_eq!(res_tpl.elements()[0].error(), Ok(())); + + // Ensure that the low water mark has advanced to 5. + let (lo, hi) = producer + .client() + .fetch_watermarks(&topic, 0, timeout) + .unwrap(); + assert_eq!(lo, 5); + assert_eq!(hi, 5); +} + #[tokio::test] async fn test_configs() { let admin_client = create_admin_client();