Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): set default_sink_decouple = true for all sink (#18182) #18233

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions e2e_test/sink/cassandra_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float,
statement ok
CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int);

statement ok
set sink_decouple = false;

statement ok
CREATE SINK s6
FROM
Expand Down
1 change: 1 addition & 0 deletions e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4,
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test',
commit_checkpoint_interval = 1,
);

statement ok
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t7 (
v1 int primary key,
Expand Down
21 changes: 0 additions & 21 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,6 @@ where
})
}

pub(crate) fn deserialize_optional_u64_from_string<'de, D>(
deserializer: D,
) -> Result<Option<u64>, D::Error>
where
D: de::Deserializer<'de>,
{
let s: String = de::Deserialize::deserialize(deserializer)?;
if s.is_empty() {
Ok(None)
} else {
s.parse()
.map_err(|_| {
de::Error::invalid_value(
de::Unexpected::Str(&s),
&"integer greater than or equal to 0",
)
})
.map(Some)
}
}

pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
deserializer: D,
) -> std::result::Result<Option<Vec<String>>, D::Error>
Expand Down
43 changes: 23 additions & 20 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
use serde::ser::{SerializeSeq, SerializeStruct};
use serde::Serialize;
use serde_derive::Deserialize;
use serde_with::serde_as;
use serde_with::{serde_as, DisplayFromStr};
use thiserror_ext::AsReport;
use tonic::async_trait;
use tracing::warn;
use with_options::WithOptions;

use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
};
use super::writer::SinkWriter;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::deserialize_optional_u64_from_string;
use crate::error::ConnectorResult;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::{
Expand All @@ -52,6 +54,7 @@ const QUERY_COLUMN: &str =
"select distinct ?fields from system.columns where database = ? and table = ? order by ?";
pub const CLICKHOUSE_SINK: &str = "clickhouse";

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct ClickHouseCommon {
#[serde(rename = "clickhouse.url")]
Expand All @@ -66,9 +69,10 @@ pub struct ClickHouseCommon {
pub table: String,
#[serde(rename = "clickhouse.delete.column")]
pub delete_column: Option<String>,
/// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,
/// Commit every n(>0) checkpoints, default is 10.
#[serde(default = "default_commit_checkpoint_interval")]
#[serde_as(as = "DisplayFromStr")]
pub commit_checkpoint_interval: u64,
}

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -494,26 +498,25 @@ impl Sink for ClickHouseSink {
const SINK_NAME: &'static str = CLICKHOUSE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let config_decouple = if let Some(interval) =
desc.properties.get("commit_checkpoint_interval")
&& interval.parse::<u64>().unwrap_or(0) > 1
{
true
} else {
false
};
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};

match user_specified {
SinkDecouple::Default => Ok(config_decouple),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if config_decouple {
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
Ok(false)
}
SinkDecouple::Enable => Ok(true),
}
}

Expand Down Expand Up @@ -552,9 +555,9 @@ impl Sink for ClickHouseSink {
self.check_pk_match(&clickhouse_column)?;
}

if self.config.common.commit_checkpoint_interval == Some(0) {
if self.config.common.commit_checkpoint_interval == 0 {
return Err(SinkError::Config(anyhow!(
"commit_checkpoint_interval must be greater than 0"
"`commit_checkpoint_interval` must be greater than 0"
)));
}
Ok(())
Expand All @@ -569,7 +572,7 @@ impl Sink for ClickHouseSink {
)
.await?;
let commit_checkpoint_interval =
NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect(
NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
);

Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ use async_trait::async_trait;
use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 10;

pub fn default_commit_checkpoint_interval() -> u64 {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
}

/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`).
/// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader,
Expand Down
44 changes: 24 additions & 20 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,26 @@ use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
use risingwave_pb::connector_service::SinkMetadata;
use serde_derive::{Deserialize, Serialize};
use serde_with::serde_as;
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;

use super::catalog::desc::SinkDesc;
use super::coordinate::CoordinatedSinkWriter;
use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
};
use super::writer::SinkWriter;
use super::{
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use crate::deserialize_optional_u64_from_string;

pub const DELTALAKE_SINK: &str = "deltalake";
pub const DEFAULT_REGION: &str = "us-east-1";
pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";

#[serde_as]
#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
pub struct DeltaLakeCommon {
#[serde(rename = "s3.access.key")]
Expand All @@ -69,10 +72,12 @@ pub struct DeltaLakeCommon {
pub s3_endpoint: Option<String>,
#[serde(rename = "gcs.service.account")]
pub gcs_service_account: Option<String>,
/// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,
/// Commit every n(>0) checkpoints, default is 10.
#[serde(default = "default_commit_checkpoint_interval")]
#[serde_as(as = "DisplayFromStr")]
pub commit_checkpoint_interval: u64,
}

impl DeltaLakeCommon {
pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
let table = match Self::get_table_url(&self.location)? {
Expand Down Expand Up @@ -281,26 +286,25 @@ impl Sink for DeltaLakeSink {
const SINK_NAME: &'static str = DELTALAKE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let config_decouple = if let Some(interval) =
desc.properties.get("commit_checkpoint_interval")
&& interval.parse::<u64>().unwrap_or(0) > 1
{
true
} else {
false
};
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};

match user_specified {
SinkDecouple::Default => Ok(config_decouple),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if config_decouple {
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
Ok(false)
}
SinkDecouple::Enable => Ok(true),
}
}

Expand Down Expand Up @@ -328,7 +332,7 @@ impl Sink for DeltaLakeSink {
.await?;

let commit_checkpoint_interval =
NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect(
NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
);

Expand Down Expand Up @@ -380,9 +384,9 @@ impl Sink for DeltaLakeSink {
)));
}
}
if self.config.common.commit_checkpoint_interval == Some(0) {
if self.config.common.commit_checkpoint_interval == 0 {
return Err(SinkError::Config(anyhow!(
"commit_checkpoint_interval must be greater than 0"
"`commit_checkpoint_interval` must be greater than 0"
)));
}
Ok(())
Expand Down
9 changes: 0 additions & 9 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tonic::Status;
use with_options::WithOptions;

use super::catalog::desc::SinkDesc;
use super::catalog::SinkFormatDesc;
use super::formatter::SinkFormatterImpl;
use super::log_store::DeliveryFutureManagerAddFuture;
Expand Down Expand Up @@ -114,13 +112,6 @@ impl Sink for GooglePubSubSink {

const SINK_NAME: &'static str = PUBSUB_SINK;

fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
}
}

async fn validate(&self) -> Result<()> {
if !self.is_append_only {
return Err(SinkError::GooglePubSub(anyhow!(
Expand Down
Loading
Loading