diff --git a/.gitignore b/.gitignore
index cef7c0cf8..b8b871e2d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,4 +3,5 @@
target/
*.json
.project
-.classpath
\ No newline at end of file
+.classpath
+*.log
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 9743b23dc..d076f5f5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
com.snowflake
snowflake-kafka-connector
- 1.0.0
+ 1.1.0
jar
Snowflake Kafka Connector
Snowflake Kafka Connect Sink Connector
@@ -53,7 +53,7 @@
confluent
Confluent
- http://packages.confluent.io/maven/
+ https://packages.confluent.io/maven/
@@ -415,7 +415,7 @@
net.snowflake
snowflake-jdbc
- 3.9.1
+ 3.11.1
@@ -451,9 +451,9 @@
test
- log4j
- log4j
- 1.2.17
+ org.apache.logging.log4j
+ log4j-core
+ 2.13.0
test
diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
index f7c533ca2..eda701cf4 100644
--- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
+++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
@@ -29,6 +29,7 @@
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@@ -45,23 +46,15 @@ public class SnowflakeSinkTask extends SinkTask
private static final long WAIT_TIME = 5 * 1000;//5 sec
private static final int REPEAT_TIME = 12; //60 sec
- // connector configuration
- private Map config = null;
-
- private Map topic2table;
- private boolean appendTableHash;
-
- // config buffer.count.records -- how many records to buffer
- private long bufferCountRecords;
- // config buffer.size.bytes -- aggregate size in bytes of all records to buffer
- private long bufferSizeBytes;
- private long bufferFlushTime;
-
private SnowflakeSinkService sink = null;
+ private Map topic2table = null;
+ private boolean appendTableHash;
- // snowflake JDBC connection provides methods to interact with user's snowflake
+ // snowflake JDBC connection provides methods to interact with user's
+ // snowflake
// account and execute queries
private SnowflakeConnectionService conn = null;
+ private String id = "-1";
private static final Logger LOGGER = LoggerFactory
.getLogger(SnowflakeSinkTask.class);
@@ -102,33 +95,51 @@ private SnowflakeSinkService getSink()
/**
* start method handles configuration parsing and one-time setup of the
* task. loads configuration
+ *
* @param parsedConfig - has the configuration settings
*/
@Override
public void start(final Map parsedConfig)
{
- LOGGER.info(Logging.logMessage("SnowflakeSinkTask:start"));
+ this.id = parsedConfig.getOrDefault(Utils.TASK_ID, "-1");
- this.config = parsedConfig;
+ LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start", this.id));
+ // connector configuration
//generate topic to table map
- this.topic2table = getTopicToTableMap(config);
- this.appendTableHash = Boolean.parseBoolean(config.get(SnowflakeSinkConnectorConfig.APPEND_TABLE_HASH));
+ this.topic2table = getTopicToTableMap(parsedConfig);
+ this.appendTableHash = Boolean.parseBoolean(parsedConfig.get(
+ SnowflakeSinkConnectorConfig.APPEND_TABLE_HASH));
//enable jvm proxy
- Utils.enableJVMProxy(config);
+ Utils.enableJVMProxy(parsedConfig);
- this.bufferCountRecords = Long.parseLong(config.get
+ // config buffer.count.records -- how many records to buffer
+ final long bufferCountRecords = Long.parseLong(parsedConfig.get
(SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS));
- this.bufferSizeBytes = Long.parseLong(config.get
+ // config buffer.size.bytes -- aggregate size in bytes of all records to
+ // buffer
+ final long bufferSizeBytes = Long.parseLong(parsedConfig.get
(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES));
- this.bufferFlushTime = Long.parseLong(config.get
+ final long bufferFlushTime = Long.parseLong(parsedConfig.get
(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC));
conn = SnowflakeConnectionServiceFactory
.builder()
.setProperties(parsedConfig)
.build();
+
+ if (this.sink != null)
+ {
+ this.sink.closeAll();
+ }
+ this.sink = SnowflakeSinkServiceFactory.builder(getConnection())
+ .setFileSize(bufferSizeBytes)
+ .setRecordNumber(bufferCountRecords)
+ .setFlushTime(bufferFlushTime)
+ .setTopic2TableMap(topic2table)
+ .setAppendTableHash(appendTableHash)
+ .build();
}
/**
@@ -139,8 +150,11 @@ public void start(final Map parsedConfig)
@Override
public void stop()
{
- LOGGER.info(Logging.logMessage("SnowflakeSinkTask:stop"));
-
+ LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop", this.id));
+ if (sink != null)
+ {
+ this.sink.closeAll();
+ }
}
/**
@@ -153,22 +167,10 @@ public void stop()
public void open(final Collection partitions)
{
LOGGER.info(Logging.logMessage(
- "SnowflakeSinkTask:open, TopicPartitions: {}", partitions
+ "SnowflakeSinkTask[ID:{}]:open, TopicPartitions: {}", this.id, partitions
));
-
- SnowflakeSinkServiceFactory.SnowflakeSinkServiceBuilder sinkBuilder =
- SnowflakeSinkServiceFactory.builder(getConnection())
- .setFileSize(bufferSizeBytes)
- .setRecordNumber(bufferCountRecords)
- .setFlushTime(bufferFlushTime);
-
- partitions.forEach(
- partition -> {
- String tableName = tableName(partition.topic(), topic2table, appendTableHash);
- sinkBuilder.addTask(tableName, partition.topic(), partition.partition());
- }
- );
- sink = sinkBuilder.build();
+ partitions.forEach(tp -> this.sink.startTask(Utils.tableName(tp.topic(),
+ this.topic2table, appendTableHash), tp.topic(), tp.partition()));
}
@@ -176,14 +178,18 @@ public void open(final Collection partitions)
* close sink service
* close all running task because the parameter of open function contains all
* partition info but not only the new partition
+ *
* @param partitions - The list of all partitions that were assigned to the
* task
*/
@Override
public void close(final Collection partitions)
{
- LOGGER.info(Logging.logMessage("SnowflakeSinkTask:close"));
- getSink().close();
+ LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:close", this.id));
+ if (this.sink != null)
+ {
+ this.sink.close(partitions);
+ }
}
/**
@@ -195,6 +201,9 @@ public void close(final Collection partitions)
@Override
public void put(final Collection records)
{
+ LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records",
+ this.id, records.size()));
+ //log more info may impact performance
records.forEach(getSink()::insert);
}
@@ -217,14 +226,15 @@ public Map preCommit(
(topicPartition, offsetAndMetadata) ->
{
long offSet = getSink().getOffset(topicPartition);
- if(offSet == 0)
+ if (offSet == 0)
{
committedOffsets.put(topicPartition, offsetAndMetadata);
//todo: update offset?
}
else
{
- committedOffsets.put(topicPartition, new OffsetAndMetadata(getSink().getOffset(topicPartition)));
+ committedOffsets.put(topicPartition,
+ new OffsetAndMetadata(getSink().getOffset(topicPartition)));
}
}
);
@@ -243,16 +253,17 @@ public String version()
/**
* parse topic to table map
+ *
* @param config connector config file
* @return result map
*/
- static Map getTopicToTableMap(Map config)
+ static Map getTopicToTableMap(Map config)
{
- if(config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP))
+ if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP))
{
Map result =
Utils.parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP));
- if(result != null)
+ if (result != null)
{
return result;
}
@@ -264,65 +275,10 @@ static Map getTopicToTableMap(Map config)
}
- /**
- * verify topic name, and generate valid table name
- * @param topic input topic name
- * @param topic2table topic to table map
- * @return table name
- */
- static String tableName(String topic, Map topic2table, boolean appendTableHash)
- {
- final String PLACE_HOLDER = "_";
- if(topic == null || topic.isEmpty())
- {
- throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic);
- }
- if(topic2table.containsKey(topic))
- {
- return topic2table.get(topic);
- }
- if(Utils.isValidSnowflakeObjectIdentifier(topic))
- {
- return topic;
- }
- int hash = Math.abs(topic.hashCode());
-
- StringBuilder result = new StringBuilder();
-
- int index = 0;
- //first char
- if(topic.substring(index,index + 1).matches("[_a-zA-Z]"))
- {
- result.append(topic.charAt(0));
- index ++;
- }
- else
- {
- result.append(PLACE_HOLDER);
- }
- while(index < topic.length())
- {
- if (topic.substring(index, index + 1).matches("[_$a-zA-Z0-9]"))
- {
- result.append(topic.charAt(index));
- }
- else
- {
- result.append(PLACE_HOLDER);
- }
- index ++;
- }
-
- if(appendTableHash) {
- result.append(PLACE_HOLDER);
- result.append(hash);
- }
-
- return result.toString();
- }
/**
* wait for specific status
+ *
* @param func status checker
*/
private static void waitFor(Supplier func) throws InterruptedException,
diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java
index b2ffe1947..2dfca9799 100644
--- a/src/main/java/com/snowflake/kafka/connector/Utils.java
+++ b/src/main/java/com/snowflake/kafka/connector/Utils.java
@@ -37,7 +37,7 @@ public class Utils
{
//Connector version, change every release
- public static final String VERSION = "1.0.0";
+ public static final String VERSION = "1.1.0";
//connector parameter list
public static final String NAME = "name";
@@ -66,7 +66,7 @@ public class Utils
//mvn repo
private static final String MVN_REPO =
- "http://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/";
+ "https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/";
private static final Logger LOGGER =
LoggerFactory.getLogger(Utils.class.getName());
@@ -75,7 +75,7 @@ public class Utils
* check the connector version from Maven repo, report if any update
* version is available.
*/
- static void checkConnectorVersion()
+ static boolean checkConnectorVersion()
{
LOGGER.info(Logging.logMessage("Snowflake Kafka Connector Version: {}",
VERSION));
@@ -122,9 +122,10 @@ else if (!latestVersion.equals(VERSION))
{
LOGGER.warn(Logging.logMessage("can't verify latest connector version " +
"from Maven Repo\n{}", e.getMessage()));
+ return false;
}
-
+ return true;
}
/**
@@ -403,6 +404,64 @@ static String validateConfig(Map config)
return connectorName;
}
+ /**
+ * verify topic name, and generate valid table name
+ * @param topic input topic name
+ * @param topic2table topic to table map
+ * @param appendTableHash flag to add hash to the end of table name
+ * @return table name
+ */
+ public static String tableName(String topic, Map topic2table, boolean appendTableHash)
+ {
+ final String PLACE_HOLDER = "_";
+ if(topic == null || topic.isEmpty())
+ {
+ throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic);
+ }
+ if(topic2table.containsKey(topic))
+ {
+ return topic2table.get(topic);
+ }
+ if(Utils.isValidSnowflakeObjectIdentifier(topic))
+ {
+ return topic;
+ }
+ int hash = Math.abs(topic.hashCode());
+
+ StringBuilder result = new StringBuilder();
+
+ int index = 0;
+ //first char
+ if(topic.substring(index,index + 1).matches("[_a-zA-Z]"))
+ {
+ result.append(topic.charAt(0));
+ index ++;
+ }
+ else
+ {
+ result.append(PLACE_HOLDER);
+ }
+ while(index < topic.length())
+ {
+ if (topic.substring(index, index + 1).matches("[_$a-zA-Z0-9]"))
+ {
+ result.append(topic.charAt(index));
+ }
+ else
+ {
+ result.append(PLACE_HOLDER);
+ }
+ index ++;
+ }
+
+ if(appendTableHash) {
+ result.append(PLACE_HOLDER);
+ result.append(hash);
+ }
+
+ return result.toString();
+ }
+
static Map parseTopicToTableMap(String input)
{
Map topic2Table = new HashMap<>();
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java
index ed21d9958..f18da56da 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java
@@ -4,6 +4,9 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
+import java.util.Collection;
+import java.util.Map;
+
/**
* Background service of data sink, responsible to create/drop pipe and ingest/purge files
*/
@@ -35,7 +38,13 @@ public interface SnowflakeSinkService
/**
* terminate all tasks and close this service instance
*/
- void close();
+ void closeAll();
+
+ /**
+ * terminate given topic partitions
+ * @param partitions a list of topic partition
+ */
+ void close(Collection partitions);
/**
* retrieve sink service status
@@ -58,6 +67,18 @@ public interface SnowflakeSinkService
*/
void setFileSize(long size);
+ /**
+ * pass topic to table map to sink service
+ * @param topic2TableMap a String to String Map represents topic to table map
+ */
+ void setTopic2TableMap(Map topic2TableMap);
+
+ /**
+ * control whether append a hash at the end of a table name
+ * @param appendTableHash a boolean flag to control append hash to a table name
+ */
+ void setAppendTableHash(boolean appendTableHash);
+
/**
* change flush rate of sink service
* the minimum flush time is controlled by
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceFactory.java
index 4bd5024b8..487f96c67 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceFactory.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceFactory.java
@@ -1,6 +1,7 @@
package com.snowflake.kafka.connector.internal;
import java.security.PrivateKey;
+import java.util.Map;
/**
* A factory to create {@link SnowflakeSinkService}
@@ -61,6 +62,23 @@ public SnowflakeSinkServiceBuilder setFlushTime(long time)
logInfo("flush time is limited to {}", time);
return this;
}
+ public SnowflakeSinkServiceBuilder setTopic2TableMap(Map topic2TableMap)
+ {
+ this.service.setTopic2TableMap(topic2TableMap);
+ StringBuilder map = new StringBuilder();
+ for (Map.Entry entry: topic2TableMap.entrySet())
+ {
+ map.append(entry.getKey()).append(" -> ").append(entry.getValue()).append("\n");
+ }
+ logInfo("set topic 2 table map \n {}", map.toString());
+ return this;
+ }
+
+ public SnowflakeSinkServiceBuilder setAppendTableHash(boolean appendTableHash) {
+ this.service.setAppendTableHash(appendTableHash);
+ logInfo("set append table hash flag to {}", appendTableHash);
+ return this;
+ }
public SnowflakeSinkService build()
{
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
index cf50ed76a..9ffbb9b6c 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
@@ -7,6 +7,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
@@ -31,6 +32,8 @@ class SnowflakeSinkServiceV1 extends Logging implements SnowflakeSinkService
private final RecordService recordService;
private boolean isStopped;
private final SnowflakeTelemetryService telemetryService;
+ private Map topic2TableMap;
+ private boolean appendTableHash;
SnowflakeSinkServiceV1(SnowflakeConnectionService conn)
{
@@ -47,6 +50,8 @@ class SnowflakeSinkServiceV1 extends Logging implements SnowflakeSinkService
this.recordService = new RecordService();
isStopped = false;
this.telemetryService = conn.getTelemetryClient();
+ this.topic2TableMap = new HashMap<>();
+ this.appendTableHash = SnowflakeSinkConnectorConfig.APPEND_TABLE_HASH_DEFAULT;
}
@Override
@@ -54,7 +59,7 @@ public void startTask(final String tableName, final String topic,
final int partition)
{
String stageName = Utils.stageName(conn.getConnectorName(), tableName);
- String nameIndex = topic + "_" + partition;
+ String nameIndex = getNameIndex(topic, partition);
if (pipes.containsKey(nameIndex))
{
logError("task is already registered, name: {}", nameIndex);
@@ -73,23 +78,69 @@ public void startTask(final String tableName, final String topic,
@Override
public void insert(final SinkRecord record)
{
- String nameIndex = record.topic() + "_" + record.kafkaPartition();
+ String nameIndex = getNameIndex(record.topic(), record.kafkaPartition());
+ //init a new topic partition
+ if (!pipes.containsKey(nameIndex))
+ {
+ logWarn("Topic: {} Partition: {} hasn't been initialized by OPEN " +
+ "function", record.topic(), record.kafkaPartition());
+ startTask(Utils.tableName(record.topic(), this.topic2TableMap, this.appendTableHash),
+ record.topic(), record.kafkaPartition());
+ }
pipes.get(nameIndex).insert(record);
+
}
@Override
public long getOffset(final TopicPartition topicPartition)
{
- return pipes.get(topicPartition.topic() + "_" + topicPartition.partition()).getOffset();
+ String name = getNameIndex(topicPartition.topic(),
+ topicPartition.partition());
+ if (pipes.containsKey(name))
+ {
+ return pipes.get(name).getOffset();
+ }
+ else
+ {
+ logError("Failed to find offset of Topic: {}, Partition: {}, sink " +
+ "service hasn't been initialized");
+ return 0;
+ }
}
@Override
- public void close()
+ public void close(Collection partitions)
+ {
+ partitions.forEach(tp -> {
+ String name = getNameIndex(tp.topic(), tp.partition());
+ ServiceContext sc = pipes.remove(name);
+ if (sc != null)
+ {
+ try
+ {
+ sc.close();
+ } catch (Exception e)
+ {
+ logError("Failed to close sink service for Topic: {}, Partition: " +
+ "{}\nMessage:{}", tp.topic(), tp.partition(), e.getMessage());
+ }
+ }
+ else
+ {
+ logError("Failed to close sink service for Topic: {}, Partition: {}, " +
+ "sink service hasn't been initialized");
+ }
+ });
+ }
+
+ @Override
+ public void closeAll()
{
this.isStopped = true; // release all cleaner and flusher threads
pipes.forEach(
(name, context) -> context.close()
);
+ pipes.clear();
}
@Override
@@ -151,6 +202,17 @@ public void setFlushTime(final long time)
}
+ @Override
+ public void setTopic2TableMap(Map topic2TableMap)
+ {
+ this.topic2TableMap = topic2TableMap;
+ }
+
+ @Override
+ public void setAppendTableHash(boolean appendTableHash) {
+ this.appendTableHash = appendTableHash;
+ }
+
@Override
public long getRecordNumber()
{
@@ -169,6 +231,11 @@ public long getFileSize()
return this.fileSize;
}
+ private static String getNameIndex(String topic, int partition)
+ {
+ return topic + "_" + partition;
+ }
+
private class ServiceContext
{
private final String tableName;
@@ -238,8 +305,7 @@ private void init()
{
startCleaner();
startFlusher();
- }
- catch (Exception e)
+ } catch (Exception e)
{
logWarn("Cleaner and Flusher threads shut down before initialization");
}
@@ -676,8 +742,7 @@ private void close()
{
stopCleaner();
stopFlusher();
- }
- catch (Exception e)
+ } catch (Exception e)
{
logWarn("Failed to terminate Cleaner or Flusher");
}
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeTelemetryServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeTelemetryServiceV1.java
index 384f2093e..6df155749 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeTelemetryServiceV1.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeTelemetryServiceV1.java
@@ -197,8 +197,8 @@ private void send(TelemetryType type, JsonNode data)
{
telemetry.addLogToBatch(new TelemetryData(msg, System.currentTimeMillis()));
logDebug("sending telemetry data: {}", data.toString());
- telemetry.sendBatch();
- } catch (IOException e)
+ telemetry.sendBatchAsync();
+ } catch (Exception e)
{
logError("Failed to send telemetry data: {}", data.toString());
}
diff --git a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java
index 94a6fc672..43b360368 100644
--- a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java
+++ b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java
@@ -46,7 +46,7 @@ public void testObjectIdentifier()
@Test
public void testVersionChecker()
{
- Utils.checkConnectorVersion();
+ assert Utils.checkConnectorVersion();
}
@Test
@@ -65,22 +65,22 @@ public void testTableName()
Map topic2table =
Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234");
- assert SnowflakeSinkTask.tableName("ab@cd", topic2table, true).equals("abcd");
- assert SnowflakeSinkTask.tableName("1234", topic2table, true).equals("_1234");
+ assert Utils.tableName("ab@cd", topic2table, true).equals("abcd");
+ assert Utils.tableName("1234", topic2table, true).equals("_1234");
TestUtils.assertError(SnowflakeErrors.ERROR_0020,
- () -> SnowflakeSinkTask.tableName("", topic2table, true));
+ () -> Utils.tableName("", topic2table, true));
TestUtils.assertError(SnowflakeErrors.ERROR_0020,
- () -> SnowflakeSinkTask.tableName(null, topic2table, true));
+ () -> Utils.tableName(null, topic2table, true));
String topic = "bc*def";
- assert SnowflakeSinkTask.tableName(topic, topic2table, true).equals("bc_def_" + Math.abs(topic.hashCode()));
+ assert Utils.tableName(topic, topic2table, true).equals("bc_def_" + Math.abs(topic.hashCode()));
topic = "12345";
- assert SnowflakeSinkTask.tableName(topic, topic2table, true).equals("_12345_" + Math.abs(topic.hashCode()));
+ assert Utils.tableName(topic, topic2table, true).equals("_12345_" + Math.abs(topic.hashCode()));
topic = "test.topic";
- assert SnowflakeSinkTask.tableName(topic, topic2table, false).equals("test_topic");
+ assert Utils.tableName(topic, topic2table, false).equals("test_topic");
}
@Test
diff --git a/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java b/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java
index e3581ef3c..d7faeb81e 100644
--- a/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java
+++ b/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java
@@ -70,7 +70,7 @@ record =
service.insert(record);
Thread.sleep(90 * 1000);
- service.close();
+ service.closeAll();
ResultSet resultSet = TestUtils.executeQuery("select RECORD_METADATA from" +
" " + tableName);
diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java b/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java
index db844dee5..5f5541aca 100644
--- a/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java
+++ b/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java
@@ -133,7 +133,7 @@ public void testIngestion() throws InterruptedException,
assert service.getOffset(new TopicPartition(topic, partition)) == offset + 1;
- service.close();
+ service.closeAll();
Thread.sleep(60 * 1000);
// don't drop pipe in current version
// assert !conn.pipeExist(pipe);
@@ -167,7 +167,7 @@ public void testRecordNumber() throws InterruptedException,
Thread.sleep(90 * 1000);
assert TestUtils.tableSize(table) == numOfRecord + numOfRecord1;
- service.close();
+ service.closeAll();
}
private Future insert(SnowflakeSinkService sink, int partition,
@@ -215,7 +215,7 @@ public void testFileSize() throws ExecutionException, InterruptedException
assert result.get() == numOfRecord / (size / 152 + 1);
- service.close();
+ service.closeAll();
}
@Test
@@ -235,13 +235,13 @@ public void testFlushTime() throws InterruptedException, ExecutionException
assert insert(service, partition, numOfRecord).get() == 0;
- Thread.sleep(flushTime * 1000 + 5);
+ Thread.sleep((flushTime + 5) * 1000);
assert conn.listStage(stage,
FileNameUtils.filePrefix(TestUtils.TEST_CONNECTOR_NAME, table,
partition)).size() == 1;
- service.close();
+ service.closeAll();
}
@Test
@@ -294,7 +294,7 @@ public void testRecover() throws InterruptedException
assert conn.listStage(stage,
FileNameUtils.filePrefix(TestUtils.TEST_CONNECTOR_NAME, table, 0)).size() == 0;
- service.close();
+ service.closeAll();
}
@Test
@@ -324,6 +324,6 @@ public void testBrokenRecord()
assert TestUtils.getPartitionFromBrokenFileName(name) == partition;
assert TestUtils.getOffsetFromBrokenFileName(name) == offset;
- service.close();
+ service.closeAll();
}
}