Skip to content

Commit

Permalink
SNOW-1728002 get rid of reflection when enabling iceberg streaming (#995
Browse files Browse the repository at this point in the history
)

Co-authored-by: Michal Bobowski <[email protected]>
  • Loading branch information
sfc-gh-bzabek and sfc-gh-mbobowski authored Nov 14, 2024
1 parent fc0c9ec commit f7652f9
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.3.0</version>
<version>3.0.0</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down
2 changes: 1 addition & 1 deletion pom_confluent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.3.0</version>
<version>3.0.0</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.connect.errors.ConnectException;

/** This class handles all calls to manage the streaming ingestion client */
Expand All @@ -49,8 +48,6 @@ public SnowflakeStreamingIngestClient createClient(
.setProperties(streamingClientProperties.clientProperties)
.setParameterOverrides(streamingClientProperties.parameterOverrides);

setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled);

SnowflakeStreamingIngestClient createdClient = builder.build();

LOGGER.info(
Expand All @@ -65,17 +62,6 @@ public SnowflakeStreamingIngestClient createClient(
}
}

private static void setIcebergEnabled(
SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) {
try {
// TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002
FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true);
} catch (IllegalAccessException e) {
throw new IllegalStateException(
"Couldn't set iceberg by accessing private field: " + "isIceberg", e);
}
}

/**
* Closes the given client. Swallows any exceptions
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES;
import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES;
Expand Down Expand Up @@ -90,6 +91,9 @@ public StreamingClientProperties(Map<String, String> connectorConfig) {

// Override only if the streaming client properties are explicitly set in config
this.parameterOverrides = new HashMap<>();
if (isIcebergEnabled) {
parameterOverrides.put(ENABLE_ICEBERG_STREAMING, "true");
}
Optional<String> snowpipeStreamingMaxClientLag =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG));
snowpipeStreamingMaxClientLag.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ICEBERG_ENABLED;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER;
import static com.snowflake.kafka.connector.internal.TestUtils.getConfForStreaming;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
Expand Down Expand Up @@ -49,6 +50,7 @@ public void setUp() {
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(ICEBERG_ENABLED, "TRUE");
config.put(ENABLE_SCHEMATIZATION_CONFIG, isSchemaEvolutionEnabled().toString());
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true");

createIcebergTable();
enableSchemaEvolution(tableName);
Expand Down

0 comments on commit f7652f9

Please sign in to comment.