Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka connector: add internal columns filter pushdown #4805

Merged
merged 3 commits into from
Sep 29, 2020

Conversation

wangli-td
Copy link
Contributor

add _timestamp as internal columns.
add _partion_offset/_partition_id/_timestamp filter pushdown.

Signed-off-by: Li Wang [email protected]

@cla-bot
Copy link

cla-bot bot commented Aug 13, 2020

Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. In order for us to review and merge your code, please submit the signed CLA to [email protected]. For more information, see https://github.com/prestosql/cla.

@gschmutz
Copy link

gschmutz commented Aug 18, 2020

@wangli-td thanks for this change. I have a comment regarding the use of _timestamp as the internal column. When _timestamp returns the timestamp header value of the kafka record, this timestamp does not have to match the offset (time) of the kafka record, if the timestamp has been set by the Kafka producer to an earlier timestamp (for example the real business timestamp). If that same internal column (_timestamp) is used for the filter pushdown (by translating it to an offset), then the filter on the offset is correct and shows the kafka message which have been produced (ingested) at the given time period. But the result could include _timestamp values which are older than this time period, and I'm not sure if this would not be miss-leading. Have not done any tests with your change applied, my comment is only based on a review of your code. Maybe use another internal column for the "timestamp_offset" and still allow a filter on _timestamp but with no "pushdown"?

Update: I have tested the change and I was wrong, if the timestamp is in the past, it seems that it is not only doing the predicate pushdown on _timestamp but also applies the restriction on the result set retrieved from Kafka. Did not expect that to happen, when I added the comment this afternoon.

This query shows the test message with the last 3 created with timestamp - 1 hour (offset shows that they were ingested after the 20:10 records):

presto:logistics> select _timestamp, _partition_offset from test_topic where  _timestamp <  timestamp '2020-08-18 20:30:16';
       _timestamp        | _partition_offset
-------------------------+-------------------
 2020-08-18 20:10:31.183 |                 0
 2020-08-18 20:10:31.183 |                 1
 2020-08-18 20:10:31.183 |                 2
 2020-08-18 20:10:41.660 |                 3
 2020-08-18 20:10:41.660 |                 4
 2020-08-18 20:10:41.660 |                 5
 2020-08-18 19:25:28.776 |                 6
 2020-08-18 19:25:28.776 |                 7
 2020-08-18 19:25:28.776 |                 8
(9 rows)

if the query only selects from 20:10:31, then the records created with timestamp 19:25 are filtered out, even though their offset is within the from and to timetamp:

presto:logistics> select _timestamp, _partition_offset from test_topic where _timestamp > timestamp '2020-08-18 20:10:31' and  _timestamp <  timestamp '2020-08-18 20:30:16';
       _timestamp        | _partition_offset
-------------------------+-------------------
 2020-08-18 20:10:31.183 |                 0
 2020-08-18 20:10:31.183 |                 1
 2020-08-18 20:10:31.183 |                 2
 2020-08-18 20:10:41.660 |                 3
 2020-08-18 20:10:41.660 |                 4
 2020-08-18 20:10:41.660 |                 5
(6 rows)

So it seems to work correctly for that scenario.

@wangli-td
Copy link
Contributor Author

@gschmutz Thanks for your kindly comments. I totally get your concern. But I'm afraid it may have some confuses if Kafka meta will include two presto fields(_timestamp/_offset_timestamp) mapping to the same Kafka timestamp field. As I understand, the common use case for timestamp is that the timestamp value should be mostly keeping with the offset. And the better choice for the business timestamp should be in message itself, right? Please kindly Let me know more of your ideas. Thanks.

@wangli-td
Copy link
Contributor Author

@gschmutz yes, as you said, pushdown filter and presto filter will both have impact with the query result.

@gschmutz
Copy link

gschmutz commented Aug 22, 2020

@wangli-td sorry for the late reply. Regarding the semantics of the timestamp kafka header, you are right, often it is more or less the same as the offset time, as the default for timestamp is the producer time (the producer sets it). But you can also use it to store the event time (when it happened in real world), with the advantage, that frameworks such as Kafka Streams will use it when using time windows or doing joins. So it is not unusual, that the timestamp is holding a time in the past and is not the same as the log append time (offset time). In that case I see a problem with your implementation. I have created a small test to show the behavior I find problematic.

I have produced 5 messages, with the 2nd (offset 12) being 1 minute off append time (offset time would be 12:38:21) in that case:

presto:logistics> select _timestamp, _partition_offset from timestamp_test_topic;
       _timestamp        | _partition_offset 
-------------------------+-------------------
 2020-08-22 12:38:18.515 |                11 
 2020-08-22 12:37:21.018 |                12 
 2020-08-22 12:38:23.520 |                13 
 2020-08-22 12:38:26.023 |                14 
 2020-08-22 12:38:28.525 |                15 
(5 rows)

if I'm selecting between 12:38:00 and 12:38:28 I correctly get the 3 records (even though offset time of offset 12 is within the range, it is filtered out by the presto filter.

presto:logistics> select _timestamp, _partition_offset from timestamp_test_topic where _timestamp > timestamp '2020-08-22 12:38:00' and  _timestamp <  timestamp '2020-08-22 12:38:28';
       _timestamp        | _partition_offset 
-------------------------+-------------------
 2020-08-22 12:38:18.515 |                11 
 2020-08-22 12:38:23.520 |                13 
 2020-08-22 12:38:26.023 |                14 
(3 rows)

if I'm selecting between 12:37:00 and 12:37:30 I get no rows back, as the pushdown filter does not find any records created at this time (presto filter has no impact).

select _timestamp, _partition_offset from timestamp_test_topic where _timestamp > timestamp '2020-08-22 12:37:00' and  _timestamp <  timestamp '2020-08-22 12:37:30';
               -> 
 _timestamp | _partition_offset 
------------+-------------------
(0 rows)

but if I remove the range and only select for greater 12:37:00 then I get the record with offset 12 back)

presto:logistics> select _timestamp, _partition_offset from timestamp_test_topic where _timestamp > timestamp '2020-08-22 12:37:00';
       _timestamp        | _partition_offset 
-------------------------+-------------------
 2020-08-22 12:38:18.515 |                11 
 2020-08-22 12:37:21.018 |                12 
 2020-08-22 12:38:23.520 |                13 
 2020-08-22 12:38:26.023 |                14 
 2020-08-22 12:38:28.525 |                15 
(5 rows)

This is no really logical, where a _timstamp with 12:37:21 is once shown and once not, even though it is within the range of the previous query.

The reason for that is that the value returned in the result set for _timestamp (record header timestamp) is not the same as the one used in the filter pushdown (offset time).

That's the reason why I suggest to use two internal timestamps, one _timestamp matching the timestamp and another _offset_timestamp for the offset time or append time. On _timestamp only a presto filter would be used and on the _offset_timestamp only a filter pushdown would be used (not sure, if that is possible).
But in the meantime I also realized that there might be a problem with that as well. There would be no value for _offset_timestamp to show in the result set, as all you get back from the Consumer Record is the offset and there is no way (as far as I know) to do the opposite of #offsetsForTimes() for converting an offset into a timestamp. Not sure what that would mean in terms of implementing a connector, as I'm no expert here.

An alternative solution would be to only have _timestamp internal field with just presto filtering (no pushdown) and implement an UDF which wraps the #offsetsForTimes() functionality and then use it with a restriction on _partition_offset, i.e. WHERE _partition_offset > offsetForTime(timestamp '2020-08-22 12:37:00'). This would be cleaner in my point of view, but has of course the downside of the UDF needing to be a kafka client (and know about a connection to the Kafka broker). Update: Thinking about it a few hours later, I realized that this is far from easy/feasible, as each partition has its own timestamp to offset mapping so such a function of course would have to be used by partition which makes it difficult to use.

@wangli-td
Copy link
Contributor Author

wangli-td commented Aug 23, 2020

@gschmutz Thanks for your test. Yes, as you said, the case for that the order of timestamp is not align with the order of offset will reproduce your case. To resolve it, one way is to extend the search range, or maybe use session property to decide pushdown the timestamp or not. How about using session property to be compatible with the case.
Update:#offsetsForTimes() which pushdown uses will find the first offset later than the timestamp, so your case for "_timestamp > timestamp '2020-08-22 12:37:00' and _timestamp < timestamp '2020-08-22 12:37:30'" will push down as the offset (0, 0) which will return empty result.

Copy link
Member

@aalbu aalbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your PR, these are very useful features.

}
if (kafkaFilter.getEndOffsetTs() != null) {
partitionEndOffsets = calcTopicPartitionEndOffsetMap(partitionEndOffsets,
(p) -> findOffsetsByTimestamp(kafkaConsumer, p, kafkaFilter.getEndOffsetTs()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaConsumer#offsetsForTimes() will give you the earliest offset whose timestamp is greater than or equal to the argument you're passing. That's what you want for the lower bound of a range, but using it for the upper bound leads to the issue that @gschmutz pointed out. I think the right thing to do is to not try to limit the end offset using a _timestamp filter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aalbu yes I agree that only having a lower bound "pushed down" to Kafka would solve the issue I have pointed out. Haven't thought about that. The only downside is, that you can no longer efficiently get a range of data (i.e. getting data from January 2020 in August 2020), but there is no other solution I can think of, except adding another _timestamp_offset field for the filter pushdown and leaving the _timestamp just for filtering on the timestamp header (without pushdown to kafka), but as I mentioned above, there is no meaningful value to return for that column, if this scenario would be used (we can't translate an offset into a timestamp unfortunately).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config property @wangli-td proposed achieves the functionality you are proposing, right? Just one column mapping, but the push-down behavior is controlled by a config/session property.

@wangli-td
Copy link
Contributor Author

@aalbu Thank you very much for your comments. I apply most of your code refining suggestions. For _timestamp pushing down, I think you are right it is reasonable to push down only lower bounds for all cases, and not reasonable to push down upper bounds for gschmutz's case. And I think In most of my case (normal cases maybe) which the timestamp order is very much align with the offset order. So the Upper bound pushed down is also needed if having big data in the partitions.
So to balance all the cases, I add some properties(property of "kafka.timestamp-upper-bound-push-down-enabled" and session property of "kafka.timestamp_upper_bound_push_down_enabled") to enable or disable upper bound of timestamp pushing down. Please kindly correct me. Thanks.

return timestampUpperBoundPushDownEnabled;
}

@Config("kafka.timestamp-upper-bound-push-down-enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we concluded was that we can only push down upper bounds when the topic has message.timestamp.type=LogAppendTime. So we should name the property to reflect that, maybe something like kafka.timestamp-type with allowable values LogAppendTime and CreateTime (this should be the default, equivalent to false as it is written now, since it is the 'safe' setting - queries will return correct results, even though performance might suffer).

Ideally, Presto would obtain this value from Kafka, but I am not sure it is exposed.

Copy link

@gschmutz gschmutz Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that could be a way to do it, if it is LogAppendTime, then there it is guaranteed that offset = timestamp. This is a setting on the topic which can be retrieved via the Kafka AdminClient. And small example project on how to retrieve message.timestamp.type from a topic can be found here: https://github.com/gschmutz/various-kafka-examples/tree/master/kafka-adminclient-test. Here is the relevant code:

    String topic = "test-topic";
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "dataplatform:9092,dataplatform:9093,dataplatform:9094");
    AdminClient admin = KafkaAdminClient.create(config);

    ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);

    System.out.println("Getting topic "+topic+" configuration");
    DescribeConfigsResult describeResult = admin.describeConfigs(Collections.singleton(topicResource));
    Map<ConfigResource, Config> topicConfig = describeResult.all().get();
    Config c = topicConfig.get(topicResource);
    System.out.println(c.get("message.timestamp.type").value());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for providing the code sample. This would allow us to get rid of the property altogether and determine on a topic-by-topic case whether it's safe to push down the upper bound of a _timestamp range.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aalbu@gschmutz Thank you for your suggestion. I think decided by "message.timestamp.type" is a nice option, but the property is also needed, because even for CreateTime case(default case), pushing down the upper bound of timestamp is also a normal operation. So I suggest keep property but default not pushing unless configured.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with CreateTime is that you are not really sure what the Kafka producer is doing (is he overwriting the timestamp or not, is the system time of the producer client exactly the same as the server time ....) and if you allow the upper bound limit then it is just up to the behavior of the producer client if you can get into the issue I have mentioned or not. So not sure if it is event worth having an upper bound for the CreateTime type and if the property not only makes it more complex for a user to decide if it is really worth it.

Copy link
Contributor Author

@wangli-td wangli-td Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gschmutz hi, gschmutz, the default operation of CreateTime mode is that it will be generated by Producer API. So the mode as default will work like what LogAppendTime mode does. The problem with not supporting pushing down the upper bound is that the timestamp filter will be difficult to use for users with CreateTime mode (may very slow for million of rows maybe).For them, they have no option to push down the upper bound. Would we keep the property, but not pushing the upper bound as default unless configured for createTime mode.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangli-td hi, I don't agree that by default CreateTime works the same as LogAppendTime: if the system clock on client and Kafka cluster is not synchronized/not the same (which easily can happen/ I don't talk of timezone, just system clock sync) then even if the client program does not set the timestamp when producing a record to Kafka, the timestamp the producer client will assign might not exactly be the same as on the server (not even mentioning the lag on the network). Of course the difference would not be seconds but there will be a difference to the time used for the offset (for the index). But I also agree that a push down on upper bound would be nice to have, so we efficiently can select a "time bucket" way in the past. So maybe having a property where you can force the push down of the upper bound when topic is set to CreateTimeand the default not to do it? I think it is always better if a user has to manually "push" for a feature, which might potentially harm the result.

@wangli-td wangli-td force-pushed the master branch 2 times, most recently from ddec93e to cdaec26 Compare August 29, 2020 05:36
@wangli-td
Copy link
Contributor Author

@gschmutz @aalbu Hi gschmutz & aalbu the new patch will check whether to push down upper bound according the topic timestamp config.

Copy link
Member

@aalbu aalbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think you could add some tests?

@wangli-td
Copy link
Contributor Author

wangli-td commented Sep 3, 2020

@aalbu Hi aalbu, thanks for your reviewing, most of items are resolved. I also add some test case for filter cases.

Copy link
Member

@aalbu aalbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have been out for a few days. Thanks for adding the test. I didn't communicate well, but I was imagining an integration test, that would prove pushdown works. For example, we could read all data from a topic first and compare processed input data with that of the same query with an additional filter on_partition_offset.

See this for an example of obtaining the input data size: https://github.com/prestosql/presto/blob/19811d3d58e908d145639b1975ec111a0002e138/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java#L4126-L4129

@aalbu
Copy link
Member

aalbu commented Sep 10, 2020

@losipiuk can you review?

@losipiuk losipiuk self-requested a review September 10, 2020 14:43
Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I put a few comments.
You also need to rebase as I have just merged #4462 and there will be conflicts around internal fields definitions.

Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I did partial review yet it look that there are still some issues in the code.
If indeed I spotted problems can you please while fixing also add test coverage for those cases?

@wangli-td
Copy link
Contributor Author

@losipiuk Thanks a lot. Most of the items are resolved and some bounded test cases are also added.

Comment on lines 179 to 181
Optional<Long> offsetsByTimestamp = findOffsetsByTimestamp(kafkaConsumer, topicPartition, timestamp - 1);
return offsetsByTimestamp.map(aLong -> aLong + 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is wrong. Take a look at this example

timestamps:     10  20  20  20  21 
offsets:        1   2   3   4   5    

If I search for 21 you would like this code to return 4. But it will return 3. findOffsetsByTimestamp(...21-1) will return 3. And then we will increment it.

I think the correct code would search for first offset for upperbound. And then move back one offset.

        Optional<Long> offsetsByTimestamp = findOffsetsByTimestamp(kafkaConsumer, topicPartition, timestamp);
        return offsetsByTimestamp.map(aLong -> aLong - 1);

Copy link
Contributor Author

@wangli-td wangli-td Sep 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@losipiuk Hi losipiuk, "timestamp - 1" means the timestamp of upper bound is exclusive, aLong -> aLong + 1 means we should make offset result exclusive. Please correct me. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the example above. Where I list message timestamps and matching offsets.
If we have same message timestamp for multiple offsets the formula you propose seems not sound.

If we call findOffsetsForTimestampLessThan(..., 21) for above example data, we want to get offset 4 as a result.
And with current code we get:

So final result is 3.

Probably it is not the end of the world, as we will filter out in Presto anyway. But the formula I proposed seems to work fine unless I am missing something.

Copy link
Contributor Author

@wangli-td wangli-td Sep 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it depends exclusive first or last. I think your propose is also fine. I'll change it.
By the way, if using exclusive last. The fetch way of timestamp will same for both lower bound and upper bound. timestamps: [10 21) -> offsets: [1, 5) .
Both using findOffsetsForTimestampGreaterOrEqual is fine.

@wangli-td
Copy link
Contributor Author

Hi, losipiuk, Thanks for your reviewing. I also add the static test cases and comments for what you may concern.

Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Minor final comments. It looks good to be merged after those are addressed.

presto-docs/src/main/sphinx/connector/kafka.rst Outdated Show resolved Hide resolved
presto-docs/src/main/sphinx/connector/kafka.rst Outdated Show resolved Hide resolved
@@ -72,6 +72,32 @@ private void createTopic(int partitions, int replication, String topic)
}
}

public void createTopicWithConfig(int partitions, int replication, String topic, boolean enableLogAppendTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make createTopic call out to createTopicWithConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. It seems other module will having some test conflicts if using same createTopicWithConfig with identifying the local zookeeper config. So I make new one independently.

Add support for predicate pushdown for following Kafka internal columns
 * _timestamp
 * _partition_offset
 * _partition_id

 If predicate specifies lower bound on _timestamp column (_timestamp >
    XXXX), it is always pushed down.
 The upper bound predicate is pushed down only for topics using ``LogAppendTime`` mode.
 For topics using ``CreateTime`` mode, upper bound pushdown must be explicitly
    allowed via ``kafka.timestamp-upper-bound-force-push-down-enabled`` config property
    or ``timestamp_upper_bound_force_push_down_enabled`` session property.

Signed-off-by: Li Wang <[email protected]>
@wangli-td
Copy link
Contributor Author

@losipiuk Thanks for your review. Changing requests are already resolved in the new patch.

@losipiuk losipiuk merged commit a2ec0e3 into trinodb:master Sep 29, 2020
@losipiuk losipiuk mentioned this pull request Sep 29, 2020
10 tasks
@losipiuk
Copy link
Member

Meged. Thanks

@martint martint added this to the 344 milestone Oct 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

5 participants