Skip to content

Commit

Permalink
feat: set old consumer API deprecated (#3947)
Browse files Browse the repository at this point in the history
  • Loading branch information
galibey authored Apr 19, 2024
1 parent 9f7dd64 commit 96ff8c3
Show file tree
Hide file tree
Showing 19 changed files with 178 additions and 201 deletions.
25 changes: 14 additions & 11 deletions crates/fluvio-benchmark/src/consumer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::time::{Duration, Instant};
use async_channel::{Sender, Receiver};
use anyhow::Result;

use fluvio::{consumer::ConsumerConfigBuilder, Offset, dataplane::link::ErrorCode};
use fluvio::consumer::ConsumerConfigExtBuilder;
use fluvio::Fluvio;
use fluvio::{Offset, dataplane::link::ErrorCode};
use fluvio::dataplane::record::ConsumerRecord;
use fluvio_future::future::timeout;
use futures_util::{Stream, StreamExt};
Expand All @@ -28,16 +30,17 @@ impl ConsumerWorker {
assigned_partition: u64,
preallocation_hint: u64,
) -> Result<Self> {
let mut config_builder = ConsumerConfigBuilder::default();
config_builder.max_bytes(config.consumer_max_bytes as i32);
config_builder.isolation(config.consumer_isolation);

let fluvio_config = config_builder.build()?;

let fluvio_consumer =
fluvio::consumer(config.topic_name.clone(), assigned_partition as u32).await?;
let stream = fluvio_consumer
.stream_with_config(Offset::absolute(0)?, fluvio_config)
let fluvio = Fluvio::connect().await?;
let stream = fluvio
.consumer_with_config(
ConsumerConfigExtBuilder::default()
.topic(config.topic_name)
.partition(assigned_partition as u32)
.offset_start(Offset::absolute(0)?)
.max_bytes(config.consumer_max_bytes as i32)
.isolation(config.consumer_isolation)
.build()?,
)
.await?;

Ok(Self {
Expand Down
53 changes: 19 additions & 34 deletions crates/fluvio-connector-common/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use fluvio::{FluvioConfig, Fluvio, PartitionSelectionStrategy};
use fluvio::consumer::ConsumerConfigExtBuilder;
use fluvio::{FluvioConfig, Fluvio};
use fluvio::dataplane::record::ConsumerRecord;
use fluvio_connector_package::config::ConsumerPartitionConfig;
use fluvio_sc_schema::errors::ErrorCode;
Expand Down Expand Up @@ -26,49 +27,33 @@ pub async fn consumer_stream_from_config(
let fluvio = Fluvio::connect_with_config(&cluster_config).await?;
ensure_topic_exists(config).await?;

let mut builder = fluvio::ConsumerConfig::builder();
if let Some(max_bytes) = config.meta().consumer().and_then(|c| c.max_bytes) {
builder.max_bytes(max_bytes.as_u64() as i32);
}
if let Some(smartmodules) = smartmodule_vec_from_config(config) {
builder.smartmodule(smartmodules);
}
let consumer_config = builder.build()?;
let consumer_partition = config
.meta()
.consumer()
.map(|c| c.partition.clone())
.unwrap_or_default();
let offset = fluvio::Offset::end();
let topic = config.meta().topic().to_string();
let stream = match consumer_partition {
let mut builder = ConsumerConfigExtBuilder::default();
builder.topic(config.meta().topic());
builder.offset_start(fluvio::Offset::end());
match consumer_partition {
ConsumerPartitionConfig::One(partition) => {
let consumer = fluvio.partition_consumer(topic, partition).await?;
consumer
.stream_with_config(offset, consumer_config)
.await?
.boxed()
}
ConsumerPartitionConfig::All => {
let consumer = fluvio
.consumer(PartitionSelectionStrategy::All(topic))
.await?;
consumer
.stream_with_config(offset, consumer_config)
.await?
.boxed()
builder.partition(partition);
}
ConsumerPartitionConfig::All => (),
ConsumerPartitionConfig::Many(partitions) => {
let partitions = partitions.into_iter().map(|p| (topic.clone(), p)).collect();
let consumer = fluvio
.consumer(PartitionSelectionStrategy::Multiple(partitions))
.await?;
consumer
.stream_with_config(offset, consumer_config)
.await?
.boxed()
for partition in partitions {
builder.partition(partition);
}
}
};
if let Some(max_bytes) = config.meta().consumer().and_then(|c| c.max_bytes) {
builder.max_bytes(max_bytes.as_u64() as i32);
}
if let Some(smartmodules) = smartmodule_vec_from_config(config) {
builder.smartmodule(smartmodules);
}

let stream = fluvio.consumer_with_config(builder.build()?).await?.boxed();

Ok((fluvio, stream))
}
45 changes: 22 additions & 23 deletions crates/fluvio-test-util/test_runner/test_driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//use fluvio::consumer::{PartitionSelectionStrategy, ConsumerConfig};

use fluvio::dataplane::link::ErrorCode;
use tracing::debug;
use anyhow::Result;

use fluvio::consumer::PartitionSelectionStrategy;
use fluvio::Fluvio;
use fluvio::consumer::{ConsumerConfigExt, ConsumerConfigExtBuilder, ConsumerStream, Record};
use fluvio::{Fluvio, Offset};
use fluvio::metadata::topic::TopicSpec;
use fluvio::{TopicProducer, RecordKey, PartitionConsumer, MultiplePartitionConsumer};
use fluvio::{TopicProducer, RecordKey};
use fluvio::TopicProducerConfig;
use fluvio::metadata::topic::CleanupPolicy;
use fluvio::metadata::topic::SegmentBasedPolicy;
Expand Down Expand Up @@ -130,12 +131,12 @@ impl TestDriver {
Ok(())
}

pub async fn get_consumer(&self, topic: &str, partition: PartitionId) -> PartitionConsumer {
pub async fn get_consumer_with_config(
&self,
config: ConsumerConfigExt,
) -> impl ConsumerStream<Item = Result<Record, ErrorCode>> + Unpin {
let fluvio_client = self.create_client().await.expect("cant' create client");
match fluvio_client
.partition_consumer(topic.to_string(), partition)
.await
{
match fluvio_client.consumer_with_config(config).await {
Ok(client) => {
//self.consumer_num += 1;
client
Expand All @@ -146,21 +147,19 @@ impl TestDriver {
}
}

// TODO: Create a multi-partition api w/ a list of partitions based off this
pub async fn get_all_partitions_consumer(&self, topic: &str) -> MultiplePartitionConsumer {
let fluvio_client = self.create_client().await.expect("cant' create client");
match fluvio_client
.consumer(PartitionSelectionStrategy::All(topic.to_string()))
.await
{
Ok(client) => {
//self.consumer_num += 1;
client
}
Err(err) => {
panic!("can't create consumer: {err:#?}");
}
}
pub async fn get_consumer_with_start(
&self,
topic: &str,
partition: PartitionId,
offset_start: Offset,
) -> impl ConsumerStream<Item = Result<Record, ErrorCode>> + Unpin {
let config = ConsumerConfigExtBuilder::default()
.topic(topic)
.partition(partition)
.offset_start(offset_start)
.build()
.expect("config");
self.get_consumer_with_config(config).await
}

// Re-enable when we re-enable metrics
Expand Down
8 changes: 3 additions & 5 deletions crates/fluvio-test/src/tests/batching/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ pub async fn batching(

println!("Found leader {leader}");

let consumer = test_driver.get_consumer(&topic_name, 0).await;
let mut stream = consumer
.stream(Offset::end())
.await
.expect("Failed to create consumer stream");
let mut stream = test_driver
.get_consumer_with_start(&topic_name, 0, Offset::end())
.await;

for _ in 0..150 {
// Ensure record is sent after the linger time even if we dont call flush()
Expand Down
9 changes: 6 additions & 3 deletions crates/fluvio-test/src/tests/concurrent/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ pub async fn consumer_stream(
option: MyTestCase,
digests: Receiver<String>,
) {
let consumer = test_driver
.get_consumer(&option.environment.base_topic_name(), 0)
let mut stream = test_driver
.get_consumer_with_start(
&option.environment.base_topic_name(),
0,
Offset::beginning(),
)
.await;
let mut stream = consumer.stream(Offset::beginning()).await.unwrap();

let mut index: i32 = 0;
while let Some(Ok(record)) = stream.next().await {
Expand Down
94 changes: 24 additions & 70 deletions crates/fluvio-test/src/tests/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::any::Any;
use std::pin::Pin;
use std::time::{Duration, SystemTime};

use clap::Parser;
Expand All @@ -9,8 +8,8 @@ use tokio::select;
use hdrhistogram::Histogram;

use fluvio_protocol::link::ErrorCode;
use fluvio::consumer::Record;
use fluvio::{ConsumerConfig, MultiplePartitionConsumer, PartitionConsumer, RecordKey};
use fluvio::consumer::{ConsumerConfigExtBuilder, Record};
use fluvio::RecordKey;
use fluvio::Offset;

use fluvio_test_derive::fluvio_test;
Expand Down Expand Up @@ -113,13 +112,11 @@ impl TestOption for ConsumerTestOption {
}
}

async fn consume_work<S: ?Sized>(
mut stream: Pin<Box<S>>,
consumer_id: u32,
test_case: ConsumerTestCase,
) where
async fn consume_work<S: ?Sized>(stream: &mut S, consumer_id: u32, test_case: ConsumerTestCase)
where
//S: Stream<Item = Result<Record, FluvioError>> + std::marker::Unpin,
S: Stream<Item = Result<Record, ErrorCode>>,
S: Unpin,
{
let mut records_recvd = 0;

Expand Down Expand Up @@ -209,52 +206,6 @@ async fn consume_work<S: ?Sized>(
);
}

fn build_consumer_config(test_case: ConsumerTestCase) -> ConsumerConfig {
let mut config = ConsumerConfig::builder();

// continuous
if test_case.option.num_records == 0 {
config.disable_continuous(true);
}

// max bytes
if let Some(max_bytes) = test_case.option.max_bytes {
config.max_bytes(max_bytes as i32);
}

config.build().expect("Couldn't build consumer config")
}

async fn get_single_stream(
consumer: PartitionConsumer,
offset: Offset,
test_case: ConsumerTestCase,
) -> Pin<Box<dyn Stream<Item = Result<Record, ErrorCode>>>> {
let config = build_consumer_config(test_case);

Box::pin(
consumer
.stream_with_config(offset, config)
.await
.expect("Unable to open stream"),
)
}

async fn get_multi_stream(
consumer: MultiplePartitionConsumer,
offset: Offset,
test_case: ConsumerTestCase,
) -> Pin<Box<dyn Stream<Item = Result<Record, ErrorCode>>>> {
let config = build_consumer_config(test_case);

Box::pin(
consumer
.stream_with_config(offset, config)
.await
.expect("Unable to open stream"),
)
}

#[fluvio_test(name = "consumer", topic = "consumer-test")]
pub fn run(mut test_driver: FluvioTestDriver, mut test_case: TestCase) {
let test_case: ConsumerTestCase = test_case.into();
Expand Down Expand Up @@ -345,24 +296,27 @@ pub fn run(mut test_driver: FluvioTestDriver, mut test_case: TestCase) {
.expect("Connecting to cluster failed");

// TODO: Support multiple topics
let mut config_builder = ConsumerConfigExtBuilder::default();
config_builder
.topic(test_case.environment.base_topic_name())
.offset_start(offset);
if !is_multi {
config_builder.partition(partition);
}
// continuous
if test_case.option.num_records == 0 {
config_builder.disable_continuous(true);
}

if is_multi {
let consumer = test_driver
.get_all_partitions_consumer(&test_case.environment.base_topic_name())
.await;
let stream: Pin<Box<dyn Stream<Item = Result<Record, ErrorCode>>>> =
get_multi_stream(consumer, offset, test_case.clone()).await;

consume_work(Box::pin(stream), n.into(), test_case).await
} else {
let consumer = test_driver
.get_consumer(&test_case.environment.base_topic_name(), partition)
.await;
let stream: Pin<Box<dyn Stream<Item = Result<Record, ErrorCode>>>> =
get_single_stream(consumer, offset, test_case.clone()).await;

consume_work(stream, n.into(), test_case).await
// max bytes
if let Some(max_bytes) = test_case.option.max_bytes {
config_builder.max_bytes(max_bytes as i32);
}

let mut stream = test_driver
.get_consumer_with_config(config_builder.build().expect("config"))
.await;
consume_work(&mut stream, n.into(), test_case).await
},
format!("consumer-{n}")
);
Expand Down
9 changes: 1 addition & 8 deletions crates/fluvio-test/src/tests/data_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,7 @@ pub fn data_generator(test_driver: FluvioTestDriver, test_case: TestCase) {

println!("setup");

let sync_consumer = test_driver
.get_consumer(&sync_topic, 0)
.await;

let mut sync_stream = sync_consumer
.stream(Offset::from_end(0))
.await
.expect("Unable to open stream");
let mut sync_stream = test_driver.get_consumer_with_start(&sync_topic, 0, Offset::from_end(0)).await;

let sync_producer = test_driver
.create_producer(&sync_topic)
Expand Down
9 changes: 3 additions & 6 deletions crates/fluvio-test/src/tests/data_generator/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ pub async fn producer(
// Create the syncing producer/consumer

let sync_producer = test_driver.create_producer(&sync_topic).await;
let sync_consumer = test_driver.get_consumer(&sync_topic, 0).await;

let mut sync_stream = sync_consumer
.stream(Offset::from_end(0))
.await
.expect("Unable to open stream");
let mut sync_stream = test_driver
.get_consumer_with_start(&sync_topic, 0, Offset::from_end(0))
.await;

// Let syncing process know this producer is ready
sync_producer.send(RecordKey::NULL, "ready").await.unwrap();
Expand Down
8 changes: 3 additions & 5 deletions crates/fluvio-test/src/tests/election/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,9 @@ pub async fn election(mut test_driver: TestDriver, mut test_case: TestCase) {
assert_eq!(leader_status.spec.leader, leader);
}

let consumer = test_driver.get_consumer(&topic_name, 0).await;
let mut stream = consumer
.stream(Offset::absolute(0).expect("offset"))
.await
.expect("stream");
let mut stream = test_driver
.get_consumer_with_start(&topic_name, 0, Offset::absolute(0).expect("offset"))
.await;

println!("checking msg1");
let records = stream.next().await.expect("get next").expect("next");
Expand Down
Loading

0 comments on commit 96ff8c3

Please sign in to comment.