Skip to content

Commit

Permalink
feat: support subset of partitions in consumer ext (#3948)
Browse files Browse the repository at this point in the history
  • Loading branch information
galibey authored Apr 17, 2024
1 parent dfd6799 commit 968ece9
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 36 deletions.
8 changes: 4 additions & 4 deletions crates/fluvio-cli/src/client/consume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mod cmd {

/// Partition id
#[arg(short = 'p', long, value_name = "integer")]
pub partition: Option<PartitionId>,
pub partition: Vec<PartitionId>,

/// Consume records from all partitions
#[arg(short = 'A', long = "all-partitions", conflicts_with_all = &["partition"])]
Expand Down Expand Up @@ -303,10 +303,10 @@ mod cmd {
let offset = self.calculate_offset()?;

let mut builder = ConsumerConfigExt::builder();
builder.topic(self.topic.clone());
builder.topic(&self.topic);
builder.offset_start(offset);
if let Some(partition) = self.partition {
builder.partition(partition);
for partition in &self.partition {
builder.partition(*partition);
}
if let Some(ref consumer) = self.consumer {
builder.offset_consumer(consumer.clone());
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-test/src/tests/consumer_offsets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ async fn produce_records(client: &Fluvio, topic: &str, partitions: usize) -> Res

async fn test_strategy_none(client: &Fluvio, topic: &str, partitions: usize) -> Result<()> {
let mut builder = ConsumerConfigExtBuilder::default();
if partitions == 1 {
builder.partition(0);
for partition in 0..partitions {
builder.partition(partition as u32);
}
let mut stream = client
.consumer_with_config(
Expand Down
13 changes: 10 additions & 3 deletions crates/fluvio/src/consumer/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ pub enum OffsetManagementStrategy {

#[derive(Debug, Builder, Clone)]
pub struct ConsumerConfigExt {
#[builder(setter(into))]
pub topic: String,
#[builder(default, setter(strip_option))]
pub partition: Option<PartitionId>,
#[builder(default, setter(strip_option))]
#[builder(default, setter(custom))]
pub partition: Vec<PartitionId>,
#[builder(default, setter(strip_option, into))]
pub offset_consumer: Option<String>,
pub offset_start: Offset,
#[builder(default)]
Expand Down Expand Up @@ -119,6 +120,12 @@ impl ConsumerConfigExt {
}
}

impl ConsumerConfigExtBuilder {
pub fn partition(&mut self, value: PartitionId) {
self.partition.get_or_insert(Vec::new()).push(value);
}
}

impl From<ConsumerConfigExt> for ConsumerConfig {
fn from(value: ConsumerConfigExt) -> Self {
let ConsumerConfigExt {
Expand Down
27 changes: 9 additions & 18 deletions crates/fluvio/src/fluvio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,26 +321,17 @@ impl Fluvio {
.await?
.ok_or_else(|| FluvioError::TopicNotFound(topic.to_string()))?
.spec;
let partition_streams = if let Some(partition) = config.partition {
vec![
PartitionConsumer::new(topic.clone(), partition, spu_pool.clone(), self.metrics())
.consumer_stream_with_config(config.clone())
.await?,
]
let partitions = if config.partition.is_empty() {
(0..topic_spec.partitions()).collect()
} else {
let partition_count: PartitionId = topic_spec.partitions();
let mut streams = Vec::with_capacity(partition_count as usize);
for partition in 0..partition_count {
let consumer = PartitionConsumer::new(
topic.clone(),
partition,
spu_pool.clone(),
self.metrics(),
);
streams.push(consumer.consumer_stream_with_config(config.clone()).await?);
}
streams
config.partition.clone()
};
let mut partition_streams = Vec::with_capacity(partitions.len());
for partition in partitions {
let consumer =
PartitionConsumer::new(topic.clone(), partition, spu_pool.clone(), self.metrics());
partition_streams.push(consumer.consumer_stream_with_config(config.clone()).await?);
}
Ok(MultiplePartitionConsumerStream::new(partition_streams))
}

Expand Down
12 changes: 3 additions & 9 deletions tests/cli/fluvio_smoke_tests/e2e-basic.bats
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,6 @@ teardown_file() {
}

@test "Consume all partitions by default" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on stable version"
fi
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_14" -B -d

assert_output --partial "1"
Expand All @@ -329,14 +326,11 @@ teardown_file() {
assert_success
}

@test "Consume one partition only" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on stable version"
fi
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_14" -p 1 -B -d
@test "Consume subset of partitions" {
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_14" -p 1 -p 2 -B -d

assert_output --partial "1"
refute_output --partial "2"
assert_output --partial "2"
refute_output --partial "3"
assert_success
}
Expand Down

0 comments on commit 968ece9

Please sign in to comment.