Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix clippy warnings #704

Merged
merged 2 commits into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
components: rustfmt, clippy
- run: cargo fmt -- --check
- run: cargo clippy -- -Dwarnings
- run: cargo clippy --tests -- -Dwarnings
- run: cargo test --doc

check:
Expand Down
2 changes: 1 addition & 1 deletion rdkafka-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn main() {
// Ensure that we are in the right directory
let rdkafkasys_root = Path::new("rdkafka-sys");
if rdkafkasys_root.exists() {
assert!(env::set_current_dir(&rdkafkasys_root).is_ok());
assert!(env::set_current_dir(rdkafkasys_root).is_ok());
}
if !Path::new("librdkafka/LICENSE").exists() {
eprintln!("Setting up submodules");
Expand Down
4 changes: 2 additions & 2 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ pub trait ConsumerContext: ClientContext + Sized {
/// Pre-rebalance callback. This method will run before the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
fn pre_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}

/// Post-rebalance callback. This method will run after the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
fn post_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}

// TODO: convert pointer to structure
/// Post commit callback. This method will run after a group of offsets was
Expand Down
8 changes: 4 additions & 4 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,20 +425,20 @@ impl<'a> Message for BorrowedMessage<'a> {
type Headers = BorrowedHeaders;

fn key(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
unsafe { util::ptr_to_opt_slice(self.ptr.key, self.ptr.key_len) }
}

fn payload(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
unsafe { util::ptr_to_opt_slice(self.ptr.payload, self.ptr.len) }
}

unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len)
util::ptr_to_opt_mut_slice(self.ptr.payload, self.ptr.len)
}

fn topic(&self) -> &str {
unsafe {
CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
CStr::from_ptr(rdsys::rd_kafka_topic_name(self.ptr.rkt))
.to_str()
.expect("Topic name is not valid UTF-8")
}
Expand Down
2 changes: 2 additions & 0 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ where
/// Note that this method will never block.
// Simplifying the return type requires generic associated types, which are
// unstable.
#[allow(clippy::result_large_err)]
pub fn send<'a, K, P>(
&self,
mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
Expand Down Expand Up @@ -701,6 +702,7 @@ where
/// See the documentation for [`BaseProducer::send`] for details.
// Simplifying the return type requires generic associated types, which are
// unstable.
#[allow(clippy::result_large_err)]
pub fn send<'a, K, P>(
&self,
record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
Expand Down
3 changes: 3 additions & 0 deletions src/producer/future_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ where

/// Like [`FutureProducer::send`], but if enqueuing fails, an error will be
/// returned immediately, alongside the [`FutureRecord`] provided.
#[allow(clippy::result_large_err)]
pub fn send_result<'a, K, P>(
&self,
record: FutureRecord<'a, K, P>,
Expand Down Expand Up @@ -442,6 +443,7 @@ mod tests {
#[test]
fn test_future_producer_clone() {
let producer = ClientConfig::new().create::<FutureProducer>().unwrap();
#[allow(clippy::redundant_clone)]
let _producer_clone = producer.clone();
}

Expand All @@ -452,6 +454,7 @@ mod tests {
let producer = ClientConfig::new()
.create_with_context::<_, FutureProducer<TestContext>>(test_context)
.unwrap();
#[allow(clippy::redundant_clone)]
let _producer_clone = producer.clone();
}
}
4 changes: 2 additions & 2 deletions src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ mod tests {

assert_eq!(stats.brokers.len(), 1);

let broker = stats.brokers.values().into_iter().collect::<Vec<_>>()[0];
let broker = stats.brokers.values().collect::<Vec<_>>()[0];

assert_eq!(
broker.req,
Expand All @@ -391,7 +391,7 @@ mod tests {
}

// Example from https://github.com/edenhill/librdkafka/wiki/Statistics
const EXAMPLE: &'static str = r#"
const EXAMPLE: &str = r#"
{
"name": "rdkafka#producer-1",
"client_id": "rdkafka",
Expand Down
6 changes: 3 additions & 3 deletions src/topic_partition_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl TopicPartitionList {

/// Sets all partitions in the list to the specified offset.
pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
for elem_ptr in slice {
let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
elem.set_offset(offset)?;
Expand All @@ -327,7 +327,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list.
pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
Expand All @@ -337,7 +337,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list that belong to the specified topic.
pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
Expand Down
22 changes: 13 additions & 9 deletions tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn create_consumer_group(consumer_group_name: &str) {
let admin_client = create_admin_client();
let topic_name = &rand_test_topic(consumer_group_name);
let consumer: BaseConsumer = create_config()
.set("group.id", consumer_group_name.clone())
.set("group.id", consumer_group_name)
.create()
.expect("create consumer failed");

Expand Down Expand Up @@ -74,17 +74,19 @@ fn fetch_metadata(topic: &str) -> Metadata {
create_config().create().expect("consumer creation failed");
let timeout = Some(Duration::from_secs(1));

let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(Duration::from_secs(5));
let mut backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(5)),
..Default::default()
};
(|| {
let metadata = consumer
.fetch_metadata(Some(topic), timeout)
.map_err(|e| e.to_string())?;
if metadata.topics().len() == 0 {
if metadata.topics().is_empty() {
Err("metadata fetch returned no topics".to_string())?
}
let topic = &metadata.topics()[0];
if topic.partitions().len() == 0 {
if topic.partitions().is_empty() {
Err("metadata fetch returned a topic with no partitions".to_string())?
}
Ok(metadata)
Expand All @@ -98,16 +100,18 @@ fn verify_delete(topic: &str) {
create_config().create().expect("consumer creation failed");
let timeout = Some(Duration::from_secs(1));

let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(Duration::from_secs(5));
let mut backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(5)),
..Default::default()
};
(|| {
// Asking about the topic specifically will recreate it (under the
// default Kafka configuration, at least) so we have to ask for the list
// of all topics and search through it.
let metadata = consumer
.fetch_metadata(None, timeout)
.map_err(|e| e.to_string())?;
if let Some(_) = metadata.topics().iter().find(|t| t.name() == topic) {
if metadata.topics().iter().any(|t| t.name() == topic) {
Err(format!("topic {} still exists", topic))?
}
Ok(())
Expand Down Expand Up @@ -416,7 +420,7 @@ async fn test_configs() {
}
}

let config = AlterConfig::new(broker).set("log.flush.interval.ms", &orig_val);
let config = AlterConfig::new(broker).set("log.flush.interval.ms", orig_val);
let res = admin_client
.alter_configs(&[config], &opts)
.await
Expand Down
4 changes: 2 additions & 2 deletions tests/test_high_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ async fn test_future_producer_send_full() {

// Fill up the queue.
producer
.send_result(FutureRecord::to(&topic_name).payload("A").key("B"))
.send_result(FutureRecord::to(topic_name).payload("A").key("B"))
.unwrap();

let send_message = |timeout| async move {
let start = Instant::now();
let res = producer
.send(FutureRecord::to(&topic_name).payload("A").key("B"), timeout)
.send(FutureRecord::to(topic_name).payload("A").key("B"), timeout)
.await;
match res {
Ok(_) => panic!("send unexpectedly succeeded"),
Expand Down
10 changes: 5 additions & 5 deletions tests/test_low_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,11 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
let timeout = Duration::from_secs(15);
loop {
let w = wakeups.load(Ordering::SeqCst);
if w == target {
break;
} else if w > target {
panic!("wakeups {} exceeds target {}", w, target);
}
match w.cmp(&target) {
std::cmp::Ordering::Equal => break,
std::cmp::Ordering::Greater => panic!("wakeups {} exceeds target {}", w, target),
std::cmp::Ordering::Less => (),
};
thread::sleep(Duration::from_millis(100));
if start.elapsed() > timeout {
panic!("timeout exceeded while waiting for wakeup");
Expand Down
22 changes: 12 additions & 10 deletions tests/test_low_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<Part: Partitioner + Send + Sync> ProducerContext<Part> for CollectingContex
fn get_custom_partitioner(&self) -> Option<&Part> {
match &self.partitioner {
None => None,
Some(p) => Some(&p),
Some(p) => Some(p),
}
}
}
Expand Down Expand Up @@ -144,7 +144,7 @@ impl Partitioner for PanicPartitioner {
fn default_config(config_overrides: HashMap<&str, &str>) -> ClientConfig {
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", &get_bootstrap_server())
.set("bootstrap.servers", get_bootstrap_server())
.set("message.timeout.ms", "5000");

for (key, value) in config_overrides {
Expand Down Expand Up @@ -210,11 +210,13 @@ fn test_base_producer_queue_full() {
let errors = results
.iter()
.filter(|&e| {
if let &Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) = e {
true
} else {
false
}
matches!(
e,
&Err((
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull),
_
))
)
})
.count();

Expand Down Expand Up @@ -496,7 +498,7 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {

assert_eq!(delivery_results.len(), 1);

for &(_, ref error, _) in &(*delivery_results) {
for (_, error, _) in &(*delivery_results) {
assert_eq!(*error, None);
}
}
Expand All @@ -523,7 +525,7 @@ fn test_custom_partitioner_base_producer() {

let delivery_results = context.results.lock().unwrap();

for &(ref message, ref error, _) in &(*delivery_results) {
for (message, error, _) in &(*delivery_results) {
assert_eq!(error, &None);
assert_eq!(message.partition(), 2);
}
Expand Down Expand Up @@ -551,7 +553,7 @@ fn test_custom_partitioner_threaded_producer() {

let delivery_results = context.results.lock().unwrap();

for &(ref message, ref error, _) in &(*delivery_results) {
for (message, error, _) in &(*delivery_results) {
assert_eq!(error, &None);
assert_eq!(message.partition(), 2);
}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ fn create_consumer(
fn create_producer() -> Result<BaseProducer, KafkaError> {
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", &get_bootstrap_server())
.set("bootstrap.servers", get_bootstrap_server())
.set("message.timeout.ms", "5000")
.set("enable.idempotence", "true")
.set("transactional.id", &rand_test_transactional_id())
.set("transactional.id", rand_test_transactional_id())
.set("debug", "eos");
config.set_log_level(RDKafkaLogLevel::Debug);
config.create()
Expand Down
46 changes: 22 additions & 24 deletions tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ pub fn get_broker_version() -> KafkaVersion {
panic!("KAFKA_VERSION env var contained non-unicode characters")
}
// If the environment variable is unset, assume we're running the latest version.
Err(VarError::NotPresent) => {
KafkaVersion(std::u32::MAX, std::u32::MAX, std::u32::MAX, std::u32::MAX)
}
Err(VarError::NotPresent) => KafkaVersion(u32::MAX, u32::MAX, u32::MAX, u32::MAX),
}
}

Expand Down Expand Up @@ -164,27 +162,6 @@ pub fn key_fn(id: i32) -> String {
format!("Key {}", id)
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_populate_topic() {
let topic_name = rand_test_topic("test_populate_topic");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await;

let total_messages = message_map
.iter()
.filter(|&(&(partition, _), _)| partition == 0)
.count();
assert_eq!(total_messages, 100);

let mut ids = message_map.iter().map(|(_, id)| *id).collect::<Vec<_>>();
ids.sort();
assert_eq!(ids, (0..100).collect::<Vec<_>>());
}
}

pub struct ConsumerTestContext {
pub _n: i64, // Add data for memory access validation
}
Expand Down Expand Up @@ -228,3 +205,24 @@ pub fn consumer_config(

config
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_populate_topic() {
let topic_name = rand_test_topic("test_populate_topic");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await;

let total_messages = message_map
.iter()
.filter(|&(&(partition, _), _)| partition == 0)
.count();
assert_eq!(total_messages, 100);

let mut ids = message_map.values().copied().collect::<Vec<_>>();
ids.sort();
assert_eq!(ids, (0..100).collect::<Vec<_>>());
}
}
Loading