-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1056407 Add SnowflakeConnectorPushTime to metadata #833
SNOW-1056407 Add SnowflakeConnectorPushTime to metadata #833
Conversation
src/test/java/com/snowflake/kafka/connector/records/ProcessRecordTest.java
Outdated
Show resolved
Hide resolved
@@ -193,6 +201,10 @@ private SnowflakeTableRow processRecord(SinkRecord record) { | |||
meta.put(SCHEMA_ID, valueContent.getSchemaID()); | |||
} | |||
|
|||
if (metadataConfig.connectorTimeFlag) { | |||
meta.put(CONNECTOR_TIME, clock.instant().toEpochMilli()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we get the timestamp of when insertRow(s) to the streaming SDK happened?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I think we can do better to use the timestamp when we move from KC buffer to Ingest buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is called right before pushing data to ingest-sdk. The metadata is written as json immediately, so if we want to call it in the exact place where there rows are being pushed further, refactoring of the record service is needed.
That's the call stack:
TopicPartitionChannel.insertRowsWithFallback
-> InsertRowsApiResponseSupplier
-> StreamingBuffer.getData()
-> RecordService.getProcessedRecordForStreamingIngest
-> RecordService.processRecord
.
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Outdated
Show resolved
Hide resolved
@@ -193,6 +201,10 @@ private SnowflakeTableRow processRecord(SinkRecord record) { | |||
meta.put(SCHEMA_ID, valueContent.getSchemaID()); | |||
} | |||
|
|||
if (metadataConfig.connectorTimeFlag) { | |||
meta.put(CONNECTOR_TIME, clock.instant().toEpochMilli()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I think we can do better to use the timestamp when we move from KC buffer to Ingest buffer
321d3bd
to
2997ca4
Compare
30028d3
to
5276143
Compare
5276143
to
a7f5a4d
Compare
@@ -69,6 +71,7 @@ public class RecordService { | |||
static final String CONTENT = "content"; | |||
static final String META = "meta"; | |||
static final String SCHEMA_ID = "schema_id"; | |||
static final String CONNECTOR_PUSH_TIME = "ConnectorPushTime"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have used all lower case historically
to be consistent, may be use connector_push_time
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, I see CreateTime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just have PushTime
(or flush) time to make it consistent with CreateTime
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imo PushTime
sounds quite ambiguous. It isn't clear where exactly the push happened. Ideally I'd have SnowflakeConnectorPushTime
, but it's too long and I reckon in most cases it should be obvious for users what connector is implied.
I'd like not to use "flush", as we still have an ingest-sdk that buffers data, so it isn't a time when a real flush to Snowflake happens. Also, after removing a buffering layer in the connector, nothing is going to be flushed as records will be passed to ingest-sdk immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I vote for SnowflakeConnectorPushTime
too.
TBH, it should be SnowflakeConnectorPullTime
to be very specific. (Kafka connect polling so w.r.t snowflake it is pull time)
i am fine with either given we will and should be very descriptive in our docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would even go with adding Ms
in suffix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm avoiding pull for pull-kafka reasons.
I'm not sure if ms is necessary as the other createtime doesn;t have it and it's also millisecond value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- After reconsideration, naming the field as
SnowflakeConnectorPushTime
makes more sense, as customers can use some source connector, so it isn't clear which one's timestamp it is. - I wouldn't go with ms suffix in order to align with the Kafka's naming for timestamp:
CreateTime
,LogAppendTime
.
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Outdated
Show resolved
Hide resolved
test/test_suit/test_avro_avro.py
Outdated
@@ -31,7 +31,7 @@ def verify(self, round): | |||
# validate content of line 1 | |||
res = self.driver.snowflake_conn.cursor().execute( | |||
"Select * from {} limit 1".format(self.topic)).fetchone() | |||
goldMeta = r'{"CreateTime":\d*,"key":[{"timestamp":\d*,"tweet":"Rock:Nerfpaper,scissorsisfine.",' \ | |||
goldMeta = r'{"CreateTime":\d*,"SnowflakeConnectorTime":\d*,"key":[{"timestamp":\d*,"tweet":"Rock:Nerfpaper,scissorsisfine.",' \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the default set to true? this seems like a breaking change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, default should be false. Also the name doesn't seem to much with above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turned the parameter false
by default. Yes, the property name in the test had not been updated by that time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have client breaking changes. It just needs to be documented.
I don't think having it default true is a bad thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the connector follow BCR too?
If not then both options sound alright
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client BCRs are different than Snowflake server release BCRs. Yes, there is still approval and review but to customers, there is a notice and customers can choose to upgrade to the newest client version.
I'll forward the internal doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm overall, I will let @sfc-gh-xhuang pitch in on the name of the field.
my main concern is test changes, since default is false, why are null fields showing up in goldMeta
? If they are just regex, please ignore.
It would be good to add a test for enabling/disabling the parameter to verify expected behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Left some suggestions about the naming, feel free to discuss it with the team and PMs and you can make the final call
- The default of this property should be false at the beginning, we can consider enable it some time later by default as a behavior change
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Outdated
Show resolved
Hide resolved
@@ -69,6 +71,7 @@ public class RecordService { | |||
static final String CONTENT = "content"; | |||
static final String META = "meta"; | |||
static final String SCHEMA_ID = "schema_id"; | |||
static final String CONNECTOR_PUSH_TIME = "ConnectorPushTime"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just have PushTime
(or flush) time to make it consistent with CreateTime
?
test/test_suit/test_avro_avro.py
Outdated
@@ -31,7 +31,7 @@ def verify(self, round): | |||
# validate content of line 1 | |||
res = self.driver.snowflake_conn.cursor().execute( | |||
"Select * from {} limit 1".format(self.topic)).fetchone() | |||
goldMeta = r'{"CreateTime":\d*,"key":[{"timestamp":\d*,"tweet":"Rock:Nerfpaper,scissorsisfine.",' \ | |||
goldMeta = r'{"CreateTime":\d*,"SnowflakeConnectorTime":\d*,"key":[{"timestamp":\d*,"tweet":"Rock:Nerfpaper,scissorsisfine.",' \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, default should be false. Also the name doesn't seem to much with above
b0624e9
to
48a42cb
Compare
48a42cb
to
eed76f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, left a comment.
Thanks for reverting to false by default.
src/main/java/com/snowflake/kafka/connector/records/SnowflakeMetadataConfig.java
Outdated
Show resolved
Hide resolved
@@ -69,6 +71,7 @@ public class RecordService { | |||
static final String CONTENT = "content"; | |||
static final String META = "meta"; | |||
static final String SCHEMA_ID = "schema_id"; | |||
static final String CONNECTOR_PUSH_TIME = "ConnectorPushTime"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would even go with adding Ms
in suffix
@@ -1213,7 +1214,7 @@ protected long getApproxSizeOfRecordInBytes(SinkRecord kafkaSinkRecord) { | |||
try { | |||
// get the row that we want to insert into Snowflake. | |||
Map<String, Object> tableRow = | |||
recordService.getProcessedRecordForStreamingIngest(snowflakeRecord); | |||
recordService.getProcessedRecordForStreamingIngest(snowflakeRecord, Instant.now()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about creating a TimeProvider abstraction and passing its implementation to the constructor of RecordService?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - I would consider adding TimeProvider abstraction
c0a16ec
to
6a73810
Compare
Since |
Overview
SNOW-1056407
Adding a new
SnowflakeConnectorPushTime
property in metadata that represents a moment in time when the message was pushed further by the connector.Depends on: #832.
Pre-review checklist
snowflake.streaming.metadata.connectorPushTime
.Yes
- Added end to end and Unit Tests.No
- Suggest why it is not param protected