Skip to content

Commit

Permalink
feat: added consumer offset to connector config (#3950)
Browse files Browse the repository at this point in the history
  • Loading branch information
galibey authored Apr 19, 2024
1 parent 96ff8c3 commit 8fef393
Show file tree
Hide file tree
Showing 4 changed files with 362 additions and 5 deletions.
31 changes: 28 additions & 3 deletions crates/fluvio-connector-common/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio::consumer::ConsumerConfigExtBuilder;
use fluvio::{FluvioConfig, Fluvio};
use fluvio::consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy};
use fluvio::{Fluvio, FluvioConfig, Offset};
use fluvio::dataplane::record::ConsumerRecord;
use fluvio_connector_package::config::ConsumerPartitionConfig;
use fluvio_connector_package::config::{ConsumerPartitionConfig, OffsetConfig, OffsetStrategyConfig};
use fluvio_sc_schema::errors::ErrorCode;
use futures::StreamExt;
use crate::{config::ConnectorConfig, Result};
Expand Down Expand Up @@ -35,6 +35,31 @@ pub async fn consumer_stream_from_config(
let mut builder = ConsumerConfigExtBuilder::default();
builder.topic(config.meta().topic());
builder.offset_start(fluvio::Offset::end());
if let Some(consumer_id) = config.meta().consumer().and_then(|c| c.id.as_ref()) {
builder.offset_consumer(consumer_id);
}
if let Some(consumer_offset) = config.meta().consumer().and_then(|c| c.offset.as_ref()) {
let offset_strategy = match consumer_offset.strategy {
OffsetStrategyConfig::None => OffsetManagementStrategy::None,
OffsetStrategyConfig::Manual => OffsetManagementStrategy::Manual,
OffsetStrategyConfig::Auto => OffsetManagementStrategy::Auto,
};
builder.offset_strategy(offset_strategy);
if let Some(flush) = consumer_offset.flush_period {
builder.offset_flush(flush);
}
if let Some(start) = &consumer_offset.start {
let offsset_start = match start {
OffsetConfig::Absolute(abs) => Offset::absolute(*abs)?,
OffsetConfig::Beginning => Offset::beginning(),
OffsetConfig::FromBeginning(index) => Offset::from_beginning(*index),
OffsetConfig::End => Offset::end(),
OffsetConfig::FromEnd(index) => Offset::from_end(*index),
};
builder.offset_start(offsset_start);
}
}

match consumer_partition {
ConsumerPartitionConfig::One(partition) => {
builder.partition(partition);
Expand Down
244 changes: 242 additions & 2 deletions crates/fluvio-connector-package/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::Duration;
use fluvio_controlplane_metadata::topic::config::TopicConfig;
use fluvio_types::PartitionId;
use serde::de::{Visitor, SeqAccess};
use serde::ser::SerializeSeq;
use serde::ser::{SerializeMap, SerializeSeq};
use tracing::debug;
use anyhow::Result;
use serde::{Deserialize, Serialize, Deserializer, Serializer};
Expand Down Expand Up @@ -273,15 +273,21 @@ impl MetaConfig<'_> {
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct ConsumerParameters {
#[serde(default)]
pub partition: ConsumerPartitionConfig,
#[serde(
with = "bytesize_serde",
skip_serializing_if = "Option::is_none",
default
default,
alias = "max_bytes"
)]
pub max_bytes: Option<ByteSize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub offset: Option<ConsumerOffsetConfig>,
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
Expand Down Expand Up @@ -478,6 +484,109 @@ impl Serialize for ConsumerPartitionConfig {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct ConsumerOffsetConfig {
#[serde(skip_serializing_if = "Option::is_none", default)]
pub start: Option<OffsetConfig>,
pub strategy: OffsetStrategyConfig,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub flush_period: Option<Duration>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OffsetConfig {
Absolute(i64),
Beginning,
FromBeginning(u32),
End,
FromEnd(u32),
}

impl Serialize for OffsetConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
OffsetConfig::Absolute(abs) => {
let mut map = serializer.serialize_map(Some(1))?;
map.serialize_entry("absolute", abs)?;
map.end()
}
OffsetConfig::Beginning => serializer.serialize_str("beginning"),
OffsetConfig::FromBeginning(offset) => {
let mut map = serializer.serialize_map(Some(1))?;
map.serialize_entry("from-beginning", offset)?;
map.end()
}
OffsetConfig::End => serializer.serialize_str("end"),
OffsetConfig::FromEnd(offset) => {
let mut map = serializer.serialize_map(Some(1))?;
map.serialize_entry("from-end", offset)?;
map.end()
}
}
}
}

struct OffsetConfigVisitor;
impl<'de> Visitor<'de> for OffsetConfigVisitor {
type Value = OffsetConfig;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("strings \"beginning\", \"end\" or map keys \"absolute\", \"from-beginning\", \"from-end\"")
}

fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
match v {
"beginning" => Ok(OffsetConfig::Beginning),
"end" => Ok(OffsetConfig::End),
other => Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(other),
&self,
)),
}
}

fn visit_map<A>(self, mut map: A) -> std::prelude::v1::Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
let key = map.next_key::<String>()?;
match key.as_deref() {
Some("absolute") => Ok(OffsetConfig::Absolute(map.next_value()?)),
Some("from-beginning") => Ok(OffsetConfig::FromBeginning(map.next_value()?)),
Some("from-end") => Ok(OffsetConfig::FromEnd(map.next_value()?)),
Some(other) => Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(other),
&self,
)),
None => Err(serde::de::Error::custom("expected a map entry")),
}
}
}

impl<'de> Deserialize<'de> for OffsetConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(OffsetConfigVisitor)
}
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum OffsetStrategyConfig {
None,
Manual,
Auto,
}

impl ConnectorConfig {
pub fn from_file(path: impl Into<PathBuf>) -> Result<Self> {
let mut file = File::open(path.into())?;
Expand Down Expand Up @@ -592,6 +701,8 @@ mod tests {
consumer: Some(ConsumerParameters {
partition: ConsumerPartitionConfig::One(10),
max_bytes: Some(ByteSize::mb(1)),
id: None,
offset: None,
}),
secrets: Some(vec![SecretConfig {
name: "secret1".parse().unwrap(),
Expand Down Expand Up @@ -671,6 +782,12 @@ mod tests {
consumer: Some(ConsumerParameters {
partition: ConsumerPartitionConfig::One(10),
max_bytes: Some(ByteSize::mb(1)),
id: Some("consumer_id_1".to_string()),
offset: Some(ConsumerOffsetConfig {
start: Some(OffsetConfig::Absolute(100)),
strategy: OffsetStrategyConfig::Auto,
flush_period: Some(Duration::from_secs(160)),
}),
}),
secrets: Some(vec![SecretConfig {
name: "secret1".parse().unwrap(),
Expand Down Expand Up @@ -877,6 +994,8 @@ mod tests {
consumer: Some(ConsumerParameters {
max_bytes: Some(ByteSize::b(1400)),
partition: Default::default(),
id: None,
offset: None,
}),
secrets: None,
},
Expand Down Expand Up @@ -1086,6 +1205,8 @@ mod tests {
consumer: Some(ConsumerParameters {
max_bytes: Some(ByteSize::b(1400)),
partition: Default::default(),
id: None,
offset: None,
}),
secrets: None,
},
Expand Down Expand Up @@ -1170,15 +1291,21 @@ mod tests {
let one = ConsumerParameters {
partition: ConsumerPartitionConfig::One(1),
max_bytes: Default::default(),
id: None,
offset: None,
};
let many = ConsumerParameters {
partition: ConsumerPartitionConfig::Many(vec![2, 3]),
max_bytes: Default::default(),
id: None,
offset: None,
};

let all = ConsumerParameters {
partition: ConsumerPartitionConfig::All,
max_bytes: Default::default(),
id: None,
offset: None,
};

//when
Expand All @@ -1191,4 +1318,117 @@ mod tests {
assert_eq!(many_ser, "partition:\n- 2\n- 3\n");
assert_eq!(all_ser, "partition: all\n");
}

#[test]
fn test_ser_offset_config() {
//given
let absolute = OffsetConfig::Absolute(10);
let beginning = OffsetConfig::Beginning;
let from_beginning = OffsetConfig::FromBeginning(5);
let end = OffsetConfig::End;
let from_end = OffsetConfig::FromEnd(12);

//when
let absolute_ser = serde_yaml::to_string(&absolute).expect("absolute");
let beginning_ser = serde_yaml::to_string(&beginning).expect("beginning");
let from_beginning_ser = serde_yaml::to_string(&from_beginning).expect("from_beginning");
let end_ser = serde_yaml::to_string(&end).expect("end");
let from_end_ser = serde_yaml::to_string(&from_end).expect("from_end");

//then
assert_eq!(absolute_ser, "absolute: 10\n");
assert_eq!(beginning_ser, "beginning\n");
assert_eq!(from_beginning_ser, "from-beginning: 5\n");
assert_eq!(end_ser, "end\n");
assert_eq!(from_end_ser, "from-end: 12\n");
}

#[test]
fn test_deser_offset_config() {
//given
//when
let absolute: OffsetConfig = serde_yaml::from_str(
r#"
absolute: 11
"#,
)
.expect("absolute");
let beginning: OffsetConfig = serde_yaml::from_str(
r#"
beginning
"#,
)
.expect("beginning");
let end: OffsetConfig = serde_yaml::from_str(
r#"
end
"#,
)
.expect("end");
let from_end: OffsetConfig = serde_yaml::from_str(
r#"
from-end: 12
"#,
)
.expect("from end");
let from_beginning: OffsetConfig = serde_yaml::from_str(
r#"
from-beginning: 14
"#,
)
.expect("from beginning");

//then
assert_eq!(absolute, OffsetConfig::Absolute(11));
assert_eq!(beginning, OffsetConfig::Beginning);
assert_eq!(end, OffsetConfig::End);
assert_eq!(from_end, OffsetConfig::FromEnd(12));
assert_eq!(from_beginning, OffsetConfig::FromBeginning(14));
}

#[test]
fn test_ser_consumer_offset_config() {
//given
let config = ConsumerOffsetConfig {
start: Some(OffsetConfig::Absolute(10)),
strategy: OffsetStrategyConfig::Manual,
flush_period: Some(Duration::from_secs(60)),
};

//when
let config_ser = serde_yaml::to_string(&config).expect("config");

//then
assert_eq!(
config_ser,
"start:\n absolute: 10\nstrategy: manual\nflush-period:\n secs: 60\n nanos: 0\n"
);
}

#[test]
fn test_deser_consumer_offset_config() {
//given
//when
let config: ConsumerOffsetConfig = serde_yaml::from_str(
r#"
start:
absolute: 11
strategy: auto
flush-period:
secs: 160
nanos: 0
"#,
)
.expect("config");

//then
assert_eq!(
config,
ConsumerOffsetConfig {
start: Some(OffsetConfig::Absolute(11)),
strategy: OffsetStrategyConfig::Auto,
flush_period: Some(Duration::from_secs(160))
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ meta:
consumer:
partition: 10
max_bytes: "1 MB"
id: "consumer_id_1"
offset:
start:
absolute: 100
strategy: auto
flush-period:
secs: 160
nanos: 0
secrets:
- name: secret1
transforms:
Expand Down
Loading

0 comments on commit 8fef393

Please sign in to comment.