Skip to content

Commit

Permalink
Extend Utils.tableName by appendTableHash param. Add SnowflakeSinkSer…
Browse files Browse the repository at this point in the history
…vice.setAppendTableHash method
  • Loading branch information
Alex Demidov committed Feb 25, 2020
1 parent c8332df commit 28c3744
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public void start(final Map<String, String> parsedConfig)
.setRecordNumber(bufferCountRecords)
.setFlushTime(bufferFlushTime)
.setTopic2TableMap(topic2table)
.setAppendTableHash(appendTableHash)
.build();
}

Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,10 @@ static String validateConfig(Map<String, String> config)
* verify topic name, and generate valid table name
* @param topic input topic name
* @param topic2table topic to table map
* @param appendTableHash flag to add hash to the end of table name
* @return table name
*/
public static String tableName(String topic, Map<String, String> topic2table)
public static String tableName(String topic, Map<String, String> topic2table, boolean appendTableHash)
{
final String PLACE_HOLDER = "_";
if(topic == null || topic.isEmpty())
Expand Down Expand Up @@ -453,8 +454,10 @@ public static String tableName(String topic, Map<String, String> topic2table)
index ++;
}

result.append(PLACE_HOLDER);
result.append(hash);
if(appendTableHash) {
result.append(PLACE_HOLDER);
result.append(hash);
}

return result.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public interface SnowflakeSinkService
*/
void setTopic2TableMap(Map<String, String> topic2TableMap);

/**
* control whether append a hash at the end of a table name
* @param appendTableHash a boolean flag to control append hash to a table name
*/
void setAppendTableHash(boolean appendTableHash);

/**
* change flush rate of sink service
* the minimum flush time is controlled by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public SnowflakeSinkServiceBuilder setTopic2TableMap(Map<String, String> topic2T
return this;
}

public SnowflakeSinkServiceBuilder setAppendTableHash(boolean appendTableHash) {
this.service.setAppendTableHash(appendTableHash);
logInfo("set append table hash flag to {}", appendTableHash);
return this;
}

public SnowflakeSinkService build()
{
logInfo("{} created", SnowflakeSinkService.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class SnowflakeSinkServiceV1 extends Logging implements SnowflakeSinkService
private boolean isStopped;
private final SnowflakeTelemetryService telemetryService;
private Map<String, String> topic2TableMap;
private boolean appendTableHash;

SnowflakeSinkServiceV1(SnowflakeConnectionService conn)
{
Expand All @@ -50,6 +51,7 @@ class SnowflakeSinkServiceV1 extends Logging implements SnowflakeSinkService
isStopped = false;
this.telemetryService = conn.getTelemetryClient();
this.topic2TableMap = new HashMap<>();
this.appendTableHash = SnowflakeSinkConnectorConfig.APPEND_TABLE_HASH_DEFAULT;
}

@Override
Expand Down Expand Up @@ -82,7 +84,7 @@ public void insert(final SinkRecord record)
{
logWarn("Topic: {} Partition: {} hasn't been initialized by OPEN " +
"function", record.topic(), record.kafkaPartition());
startTask(Utils.tableName(record.topic(), this.topic2TableMap),
startTask(Utils.tableName(record.topic(), this.topic2TableMap, this.appendTableHash),
record.topic(), record.kafkaPartition());
}
pipes.get(nameIndex).insert(record);
Expand Down Expand Up @@ -206,6 +208,11 @@ public void setTopic2TableMap(Map<String, String> topic2TableMap)
this.topic2TableMap = topic2TableMap;
}

@Override
public void setAppendTableHash(boolean appendTableHash) {
this.appendTableHash = appendTableHash;
}

@Override
public long getRecordNumber()
{
Expand Down

0 comments on commit 28c3744

Please sign in to comment.