Skip to content

Commit

Permalink
SNOW-1805174 Block iceberg mode with double buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Nov 14, 2024
1 parent f7652f9 commit bf59b75
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public class SnowflakeSinkConnectorConfig {
"snowflake.streaming.enable.single.buffer";

public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = false;
public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_ICEBERG_DEFAULT = true;
public static final String SNOWPIPE_STREAMING_MAX_CLIENT_LAG =
"snowflake.streaming.max.client.lag";

Expand Down Expand Up @@ -253,6 +254,18 @@ public static void setDefaultValues(Map<String, String> config) {
config, BUFFER_FLUSH_TIME_SEC, BUFFER_FLUSH_TIME_SEC_DEFAULT, "seconds");

if (isSnowpipeStreamingIngestion(config)) {
setSingleBufferDefaultValue(config);
}
}

private static void setSingleBufferDefaultValue(Map<String, String> config) {
if (Utils.isIcebergEnabled(config)) {
setFieldToDefaultValues(
config,
SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER,
SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_ICEBERG_DEFAULT,
"");
} else {
setFieldToDefaultValues(
config,
SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.internal.parameters.InternalBufferParameters;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator;
import java.util.HashMap;
Expand All @@ -14,6 +15,11 @@ public class IcebergConfigValidator implements StreamingConfigValidator {
private static final String INCOMPATIBLE_INGESTION_METHOD =
"Ingestion to Iceberg table is supported only for Snowpipe Streaming";

private static final String DOUBLE_BUFFER_NOT_SUPPORTED =
"Ingestion to Iceberg table is supported only with "
+ SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER
+ " enabled.";

@Override
public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
boolean isIcebergEnabled = Boolean.parseBoolean(inputConfig.get(ICEBERG_ENABLED));
Expand All @@ -31,6 +37,10 @@ public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD);
}

if (!InternalBufferParameters.isSingleBufferEnabled(inputConfig)) {
validationErrors.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, DOUBLE_BUFFER_NOT_SUPPORTED);
}

return ImmutableMap.copyOf(validationErrors);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.snowflake.kafka.connector.config;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
Expand Down Expand Up @@ -52,6 +53,11 @@ public static Stream<Arguments> invalidConfigs() {
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE)
.build(),
INGESTION_METHOD_OPT));
INGESTION_METHOD_OPT),
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withSingleBufferEnabled(false)
.build(),
SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER;
import static com.snowflake.kafka.connector.Utils.*;
import static com.snowflake.kafka.connector.Utils.SF_DATABASE;

Expand Down Expand Up @@ -34,6 +35,7 @@ public static SnowflakeSinkConnectorConfigBuilder icebergConfig() {
.withIcebergEnabled()
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE_STREAMING)
.withSchematizationEnabled(true)
.withSingleBufferEnabled(true) // default value for iceberg
.withRole("role");
}

Expand Down Expand Up @@ -124,6 +126,11 @@ public SnowflakeSinkConnectorConfigBuilder withChannelOffsetTokenVerificationFun
return this;
}

public SnowflakeSinkConnectorConfigBuilder withSingleBufferEnabled(boolean enabled) {
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, Boolean.toString(enabled));
return this;
}

public Map<String, String> build() {
return config;
}
Expand Down

0 comments on commit bf59b75

Please sign in to comment.