Skip to content

Commit

Permalink
Merge pull request #4 from invitae/merge-upstream
Browse files Browse the repository at this point in the history
Merge upstream/master
  • Loading branch information
alxdm authored Feb 26, 2020
2 parents 7d917d6 + 28c3744 commit 33fb192
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 139 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
target/
*.json
.project
.classpath
.classpath
*.log
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -53,7 +53,7 @@
<repository>
<id>confluent</id>
<name>Confluent</name>
<url>http://packages.confluent.io/maven/</url>
<url>https://packages.confluent.io/maven/</url>
</repository>

<repository>
Expand Down Expand Up @@ -415,7 +415,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.9.1</version>
<version>3.11.1</version>
</dependency>

<!--junit for unit test-->
Expand Down Expand Up @@ -451,9 +451,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
158 changes: 57 additions & 101 deletions src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
Expand All @@ -45,23 +46,15 @@ public class SnowflakeSinkTask extends SinkTask
private static final long WAIT_TIME = 5 * 1000;//5 sec
private static final int REPEAT_TIME = 12; //60 sec

// connector configuration
private Map<String, String> config = null;

private Map<String, String> topic2table;
private boolean appendTableHash;

// config buffer.count.records -- how many records to buffer
private long bufferCountRecords;
// config buffer.size.bytes -- aggregate size in bytes of all records to buffer
private long bufferSizeBytes;
private long bufferFlushTime;

private SnowflakeSinkService sink = null;
private Map<String, String> topic2table = null;
private boolean appendTableHash;

// snowflake JDBC connection provides methods to interact with user's snowflake
// snowflake JDBC connection provides methods to interact with user's
// snowflake
// account and execute queries
private SnowflakeConnectionService conn = null;
private String id = "-1";

private static final Logger LOGGER = LoggerFactory
.getLogger(SnowflakeSinkTask.class);
Expand Down Expand Up @@ -102,33 +95,51 @@ private SnowflakeSinkService getSink()
/**
* start method handles configuration parsing and one-time setup of the
* task. loads configuration
*
* @param parsedConfig - has the configuration settings
*/
@Override
public void start(final Map<String, String> parsedConfig)
{
LOGGER.info(Logging.logMessage("SnowflakeSinkTask:start"));
this.id = parsedConfig.getOrDefault(Utils.TASK_ID, "-1");

this.config = parsedConfig;
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start", this.id));
// connector configuration

//generate topic to table map
this.topic2table = getTopicToTableMap(config);
this.appendTableHash = Boolean.parseBoolean(config.get(SnowflakeSinkConnectorConfig.APPEND_TABLE_HASH));
this.topic2table = getTopicToTableMap(parsedConfig);
this.appendTableHash = Boolean.parseBoolean(parsedConfig.get(
SnowflakeSinkConnectorConfig.APPEND_TABLE_HASH));

//enable jvm proxy
Utils.enableJVMProxy(config);
Utils.enableJVMProxy(parsedConfig);

this.bufferCountRecords = Long.parseLong(config.get
// config buffer.count.records -- how many records to buffer
final long bufferCountRecords = Long.parseLong(parsedConfig.get
(SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS));
this.bufferSizeBytes = Long.parseLong(config.get
// config buffer.size.bytes -- aggregate size in bytes of all records to
// buffer
final long bufferSizeBytes = Long.parseLong(parsedConfig.get
(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES));
this.bufferFlushTime = Long.parseLong(config.get
final long bufferFlushTime = Long.parseLong(parsedConfig.get
(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC));

conn = SnowflakeConnectionServiceFactory
.builder()
.setProperties(parsedConfig)
.build();

if (this.sink != null)
{
this.sink.closeAll();
}
this.sink = SnowflakeSinkServiceFactory.builder(getConnection())
.setFileSize(bufferSizeBytes)
.setRecordNumber(bufferCountRecords)
.setFlushTime(bufferFlushTime)
.setTopic2TableMap(topic2table)
.setAppendTableHash(appendTableHash)
.build();
}

/**
Expand All @@ -139,8 +150,11 @@ public void start(final Map<String, String> parsedConfig)
@Override
public void stop()
{
LOGGER.info(Logging.logMessage("SnowflakeSinkTask:stop"));

LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop", this.id));
if (sink != null)
{
this.sink.closeAll();
}
}

/**
Expand All @@ -153,37 +167,29 @@ public void stop()
public void open(final Collection<TopicPartition> partitions)
{
LOGGER.info(Logging.logMessage(
"SnowflakeSinkTask:open, TopicPartitions: {}", partitions
"SnowflakeSinkTask[ID:{}]:open, TopicPartitions: {}", this.id, partitions
));

SnowflakeSinkServiceFactory.SnowflakeSinkServiceBuilder sinkBuilder =
SnowflakeSinkServiceFactory.builder(getConnection())
.setFileSize(bufferSizeBytes)
.setRecordNumber(bufferCountRecords)
.setFlushTime(bufferFlushTime);

partitions.forEach(
partition -> {
String tableName = tableName(partition.topic(), topic2table, appendTableHash);
sinkBuilder.addTask(tableName, partition.topic(), partition.partition());
}
);
sink = sinkBuilder.build();
partitions.forEach(tp -> this.sink.startTask(Utils.tableName(tp.topic(),
this.topic2table, appendTableHash), tp.topic(), tp.partition()));
}


/**
* close sink service
* close all running task because the parameter of open function contains all
* partition info but not only the new partition
*
* @param partitions - The list of all partitions that were assigned to the
* task
*/
@Override
public void close(final Collection<TopicPartition> partitions)
{
LOGGER.info(Logging.logMessage("SnowflakeSinkTask:close"));
getSink().close();
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:close", this.id));
if (this.sink != null)
{
this.sink.close(partitions);
}
}

/**
Expand All @@ -195,6 +201,9 @@ public void close(final Collection<TopicPartition> partitions)
@Override
public void put(final Collection<SinkRecord> records)
{
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records",
this.id, records.size()));
//log more info may impact performance
records.forEach(getSink()::insert);
}

Expand All @@ -217,14 +226,15 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
(topicPartition, offsetAndMetadata) ->
{
long offSet = getSink().getOffset(topicPartition);
if(offSet == 0)
if (offSet == 0)
{
committedOffsets.put(topicPartition, offsetAndMetadata);
//todo: update offset?
}
else
{
committedOffsets.put(topicPartition, new OffsetAndMetadata(getSink().getOffset(topicPartition)));
committedOffsets.put(topicPartition,
new OffsetAndMetadata(getSink().getOffset(topicPartition)));
}
}
);
Expand All @@ -243,16 +253,17 @@ public String version()

/**
* parse topic to table map
*
* @param config connector config file
* @return result map
*/
static Map<String,String> getTopicToTableMap(Map<String, String> config)
static Map<String, String> getTopicToTableMap(Map<String, String> config)
{
if(config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP))
if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP))
{
Map<String, String> result =
Utils.parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP));
if(result != null)
if (result != null)
{
return result;
}
Expand All @@ -264,65 +275,10 @@ static Map<String,String> getTopicToTableMap(Map<String, String> config)

}

/**
* verify topic name, and generate valid table name
* @param topic input topic name
* @param topic2table topic to table map
* @return table name
*/
static String tableName(String topic, Map<String, String> topic2table, boolean appendTableHash)
{
final String PLACE_HOLDER = "_";
if(topic == null || topic.isEmpty())
{
throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic);
}
if(topic2table.containsKey(topic))
{
return topic2table.get(topic);
}
if(Utils.isValidSnowflakeObjectIdentifier(topic))
{
return topic;
}
int hash = Math.abs(topic.hashCode());

StringBuilder result = new StringBuilder();

int index = 0;
//first char
if(topic.substring(index,index + 1).matches("[_a-zA-Z]"))
{
result.append(topic.charAt(0));
index ++;
}
else
{
result.append(PLACE_HOLDER);
}
while(index < topic.length())
{
if (topic.substring(index, index + 1).matches("[_$a-zA-Z0-9]"))
{
result.append(topic.charAt(index));
}
else
{
result.append(PLACE_HOLDER);
}
index ++;
}

if(appendTableHash) {
result.append(PLACE_HOLDER);
result.append(hash);
}

return result.toString();
}

/**
* wait for specific status
*
* @param func status checker
*/
private static void waitFor(Supplier<Boolean> func) throws InterruptedException,
Expand Down
Loading

0 comments on commit 33fb192

Please sign in to comment.