diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts index caf4ef619ae..948f628efbe 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts @@ -79,11 +79,11 @@ Topologies: --> Project <-- Join-this-join, Join-other-join Processor: Project (stores: []) - --> KSTREAM-TRANSFORM-0000000012 + --> ApplyTimestampTransform-S1_JOIN_S2 <-- Join-merge - Processor: KSTREAM-TRANSFORM-0000000012 (stores: []) - --> KSTREAM-SINK-0000000013 + Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: []) + --> KSTREAM-SINK-0000000012 <-- Project - Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2) - <-- KSTREAM-TRANSFORM-0000000012 + Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2) + <-- ApplyTimestampTransform-S1_JOIN_S2 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts_extractor_both_sides b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts_extractor_both_sides index dea29f2d9e9..bc803854f43 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts_extractor_both_sides +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts_extractor_both_sides @@ -79,11 +79,11 @@ Topologies: --> Project <-- Join-this-join, Join-other-join Processor: Project (stores: []) - --> KSTREAM-TRANSFORM-0000000012 + --> ApplyTimestampTransform-S1_JOIN_S2 <-- Join-merge - Processor: KSTREAM-TRANSFORM-0000000012 (stores: []) - --> KSTREAM-SINK-0000000013 + Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: []) + --> KSTREAM-SINK-0000000012 <-- Project - Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2) - <-- KSTREAM-TRANSFORM-0000000012 + Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2) + <-- ApplyTimestampTransform-S1_JOIN_S2 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_table_join_with_ts_extractor_both_sides b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_table_join_with_ts_extractor_both_sides index 628aabb9309..ccf7b3a2458 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_table_join_with_ts_extractor_both_sides +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_stream_table_join_with_ts_extractor_both_sides @@ -63,16 +63,16 @@ Topologies: --> KTABLE-TRANSFORMVALUES-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: Project (stores: []) - --> KSTREAM-TRANSFORM-0000000009 + --> ApplyTimestampTransform-S1_JOIN_T1 <-- Join - Processor: KSTREAM-TRANSFORM-0000000009 (stores: []) - --> KSTREAM-SINK-0000000010 + Processor: ApplyTimestampTransform-S1_JOIN_T1 (stores: []) + --> KSTREAM-SINK-0000000009 <-- Project Processor: KTABLE-TRANSFORMVALUES-0000000002 (stores: []) --> PrependAliasRight <-- KTABLE-SOURCE-0000000001 - Sink: KSTREAM-SINK-0000000010 (topic: S1_JOIN_T1) - <-- KSTREAM-TRANSFORM-0000000009 + Sink: KSTREAM-SINK-0000000009 (topic: S1_JOIN_T1) + <-- ApplyTimestampTransform-S1_JOIN_T1 Processor: PrependAliasRight (stores: []) --> none <-- KTABLE-TRANSFORMVALUES-0000000002 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_table_table_inner_join_with_ts b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_table_table_inner_join_with_ts index 31aeaaa24db..f52d57f88a0 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_table_table_inner_join_with_ts +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_table_table_inner_join_with_ts @@ -80,11 +80,11 @@ Topologies: --> KTABLE-TOSTREAM-0000000012 <-- KTABLE-MERGE-0000000008 Processor: KTABLE-TOSTREAM-0000000012 (stores: []) - --> KSTREAM-TRANSFORM-0000000013 + --> ApplyTimestampTransform-S1_JOIN_S2 <-- Project - Processor: KSTREAM-TRANSFORM-0000000013 (stores: []) - --> KSTREAM-SINK-0000000014 + Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: []) + --> KSTREAM-SINK-0000000013 <-- KTABLE-TOSTREAM-0000000012 - Sink: KSTREAM-SINK-0000000014 (topic: S1_JOIN_S2) - <-- KSTREAM-TRANSFORM-0000000013 + Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2) + <-- ApplyTimestampTransform-S1_JOIN_S2 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_table_table_inner_join_with_ts_extractor_both_sides b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_table_table_inner_join_with_ts_extractor_both_sides index a78c961b224..0e860d0a08a 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_table_table_inner_join_with_ts_extractor_both_sides +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/join-with-custom-timestamp_-_table_table_inner_join_with_ts_extractor_both_sides @@ -80,11 +80,11 @@ Topologies: --> KTABLE-TOSTREAM-0000000012 <-- KTABLE-MERGE-0000000008 Processor: KTABLE-TOSTREAM-0000000012 (stores: []) - --> KSTREAM-TRANSFORM-0000000013 + --> ApplyTimestampTransform-S1_JOIN_S2 <-- Project - Processor: KSTREAM-TRANSFORM-0000000013 (stores: []) - --> KSTREAM-SINK-0000000014 + Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: []) + --> KSTREAM-SINK-0000000013 <-- KTABLE-TOSTREAM-0000000012 - Sink: KSTREAM-SINK-0000000014 (topic: S1_JOIN_S2) - <-- KSTREAM-TRANSFORM-0000000013 + Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2) + <-- ApplyTimestampTransform-S1_JOIN_S2 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts index e05f2ce99d9..8bb791449d6 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts @@ -83,11 +83,11 @@ Topologies: --> Project <-- Join-this-join, Join-other-join Processor: Project (stores: []) - --> KSTREAM-TRANSFORM-0000000012 + --> ApplyTimestampTransform-S1_JOIN_S2 <-- Join-merge - Processor: KSTREAM-TRANSFORM-0000000012 (stores: []) - --> KSTREAM-SINK-0000000013 + Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: []) + --> KSTREAM-SINK-0000000012 <-- Project - Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2) - <-- KSTREAM-TRANSFORM-0000000012 + Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2) + <-- ApplyTimestampTransform-S1_JOIN_S2 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts_extractor_both_sides b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts_extractor_both_sides index 75e7b8af4bf..c4e5ad38a93 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts_extractor_both_sides +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_stream_inner_join_with_ts_extractor_both_sides @@ -83,11 +83,11 @@ Topologies: --> Project <-- Join-this-join, Join-other-join Processor: Project (stores: []) - --> KSTREAM-TRANSFORM-0000000012 + --> ApplyTimestampTransform-S1_JOIN_S2 <-- Join-merge - Processor: KSTREAM-TRANSFORM-0000000012 (stores: []) - --> KSTREAM-SINK-0000000013 + Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: []) + --> KSTREAM-SINK-0000000012 <-- Project - Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2) - <-- KSTREAM-TRANSFORM-0000000012 + Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2) + <-- ApplyTimestampTransform-S1_JOIN_S2 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_table_join_with_ts_extractor_both_sides b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_table_join_with_ts_extractor_both_sides index cbae1c585be..d6c8e84e839 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_table_join_with_ts_extractor_both_sides +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_stream_table_join_with_ts_extractor_both_sides @@ -67,16 +67,16 @@ Topologies: --> KTABLE-TRANSFORMVALUES-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: Project (stores: []) - --> KSTREAM-TRANSFORM-0000000009 + --> ApplyTimestampTransform-S1_JOIN_T1 <-- Join - Processor: KSTREAM-TRANSFORM-0000000009 (stores: []) - --> KSTREAM-SINK-0000000010 + Processor: ApplyTimestampTransform-S1_JOIN_T1 (stores: []) + --> KSTREAM-SINK-0000000009 <-- Project Processor: KTABLE-TRANSFORMVALUES-0000000002 (stores: []) --> PrependAliasRight <-- KTABLE-SOURCE-0000000001 - Sink: KSTREAM-SINK-0000000010 (topic: S1_JOIN_T1) - <-- KSTREAM-TRANSFORM-0000000009 + Sink: KSTREAM-SINK-0000000009 (topic: S1_JOIN_T1) + <-- ApplyTimestampTransform-S1_JOIN_T1 Processor: PrependAliasRight (stores: []) --> none <-- KTABLE-TRANSFORMVALUES-0000000002 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_table_table_inner_join_with_ts b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_table_table_inner_join_with_ts index 0d3af8b3730..299df691760 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_table_table_inner_join_with_ts +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_table_table_inner_join_with_ts @@ -84,11 +84,11 @@ Topologies: --> KTABLE-TOSTREAM-0000000012 <-- KTABLE-MERGE-0000000008 Processor: KTABLE-TOSTREAM-0000000012 (stores: []) - --> KSTREAM-TRANSFORM-0000000013 + --> ApplyTimestampTransform-S1_JOIN_S2 <-- Project - Processor: KSTREAM-TRANSFORM-0000000013 (stores: []) - --> KSTREAM-SINK-0000000014 + Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: []) + --> KSTREAM-SINK-0000000013 <-- KTABLE-TOSTREAM-0000000012 - Sink: KSTREAM-SINK-0000000014 (topic: S1_JOIN_S2) - <-- KSTREAM-TRANSFORM-0000000013 + Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2) + <-- ApplyTimestampTransform-S1_JOIN_S2 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_table_table_inner_join_with_ts_extractor_both_sides b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_table_table_inner_join_with_ts_extractor_both_sides index 78b407d47f5..55c22f8cf65 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_table_table_inner_join_with_ts_extractor_both_sides +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_7_0/join-with-custom-timestamp_-_table_table_inner_join_with_ts_extractor_both_sides @@ -84,11 +84,11 @@ Topologies: --> KTABLE-TOSTREAM-0000000012 <-- KTABLE-MERGE-0000000008 Processor: KTABLE-TOSTREAM-0000000012 (stores: []) - --> KSTREAM-TRANSFORM-0000000013 + --> ApplyTimestampTransform-S1_JOIN_S2 <-- Project - Processor: KSTREAM-TRANSFORM-0000000013 (stores: []) - --> KSTREAM-SINK-0000000014 + Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: []) + --> KSTREAM-SINK-0000000013 <-- KTABLE-TOSTREAM-0000000012 - Sink: KSTREAM-SINK-0000000014 (topic: S1_JOIN_S2) - <-- KSTREAM-TRANSFORM-0000000013 + Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2) + <-- ApplyTimestampTransform-S1_JOIN_S2 diff --git a/ksql-rest-app/src/test/resources/ksql-plan-schema/schema.json b/ksql-rest-app/src/test/resources/ksql-plan-schema/schema.json index 6410d227534..3fde5a93379 100644 --- a/ksql-rest-app/src/test/resources/ksql-plan-schema/schema.json +++ b/ksql-rest-app/src/test/resources/ksql-plan-schema/schema.json @@ -453,7 +453,7 @@ } }, "title" : "streamSinkV1", - "required" : [ "@type", "properties", "source", "formats", "topicName", "timestampColumn" ] + "required" : [ "@type", "properties", "source", "formats", "topicName" ] }, "StreamSource" : { "type" : "object", @@ -812,7 +812,7 @@ } }, "title" : "tableSinkV1", - "required" : [ "@type", "properties", "source", "formats", "topicName", "timestampColumn" ] + "required" : [ "@type", "properties", "source", "formats", "topicName" ] }, "TableTableJoin" : { "type" : "object", diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java index b7983a18920..63f96dcb923 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java @@ -22,7 +22,7 @@ import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.plan.KeySerdeFactory; -import io.confluent.ksql.execution.streams.timestamp.AbstractColumnTimestampExtractor; +import io.confluent.ksql.execution.streams.timestamp.KsqlTimestampExtractor; import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicy; import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory; import io.confluent.ksql.execution.timestamp.TimestampColumn; @@ -43,6 +43,8 @@ import org.apache.kafka.streams.processor.To; public final class SinkBuilder { + private static final String TIMESTAMP_TRANSFORM_NAME = "ApplyTimestampTransform-"; + private SinkBuilder() { } @@ -78,7 +80,8 @@ public static void build( ); final KStream transformed = tsTransformer - .map(t -> stream.transform(t, Named.as(StreamsUtil.buildOpName(queryContext)))) + .map(t -> stream.transform(t, Named.as(TIMESTAMP_TRANSFORM_NAME + + StreamsUtil.buildOpName(queryContext)))) .orElse(stream); transformed.to(topicName, Produced.with(keySerde, valueSerde)); @@ -105,9 +108,8 @@ private static Optional> timestampTransformer( .map(c -> sourceSchema.findValueColumn(c).orElseThrow(IllegalStateException::new)) .map(Column::index) .map(timestampPolicy::create) - .filter(te -> te instanceof AbstractColumnTimestampExtractor) .map(te -> new TransformTimestamp<>( - (AbstractColumnTimestampExtractor)te, + te, queryBuilder .getProcessingLogContext() .getLoggerFactory() @@ -123,11 +125,11 @@ private static Optional> timestampTransformer( static class TransformTimestamp implements TransformerSupplier> { - private final AbstractColumnTimestampExtractor timestampExtractor; + private final KsqlTimestampExtractor timestampExtractor; private final ProcessingLogger processingLogger; TransformTimestamp( - final AbstractColumnTimestampExtractor timestampExtractor, + final KsqlTimestampExtractor timestampExtractor, final ProcessingLogger processingLogger ) { this.timestampExtractor = requireNonNull(timestampExtractor, "timestampExtractor"); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractor.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractor.java deleted file mode 100644 index 35890837806..00000000000 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractor.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2020 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.execution.streams.timestamp; - -import com.google.common.base.Preconditions; -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.util.KsqlException; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.TimestampExtractor; - -public abstract class AbstractColumnTimestampExtractor implements TimestampExtractor { - protected final int timetampColumnIndex; - - AbstractColumnTimestampExtractor(final int timestampColumnindex) { - Preconditions.checkArgument(timestampColumnindex >= 0, "timestampColumnindex must be >= 0"); - this.timetampColumnIndex = timestampColumnindex; - } - - @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { - if (record.value() instanceof GenericRow) { - try { - return extract((GenericRow) record.value()); - } catch (final Exception e) { - throw new KsqlException("Unable to extract timestamp from record." - + " record=" + record, - e); - } - } - - return 0; - } - - public abstract long extract(GenericRow row); -} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/KsqlTimestampExtractor.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/KsqlTimestampExtractor.java new file mode 100644 index 00000000000..f2a64968070 --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/KsqlTimestampExtractor.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams.timestamp; + +import io.confluent.ksql.GenericRow; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; + +public interface KsqlTimestampExtractor extends TimestampExtractor { + default long extract(final ConsumerRecord record, final long previousTimestamp) { + return extract((GenericRow) record.value()); + } + + long extract(GenericRow row); +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongColumnTimestampExtractionPolicy.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongColumnTimestampExtractionPolicy.java index 9f66baa1274..6101512d585 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongColumnTimestampExtractionPolicy.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongColumnTimestampExtractionPolicy.java @@ -18,7 +18,6 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.name.ColumnName; import java.util.Objects; -import org.apache.kafka.streams.processor.TimestampExtractor; @Immutable public class LongColumnTimestampExtractionPolicy implements TimestampExtractionPolicy { @@ -31,7 +30,7 @@ public LongColumnTimestampExtractionPolicy(final ColumnName timestampField) { } @Override - public TimestampExtractor create(final int columnIndex) { + public KsqlTimestampExtractor create(final int columnIndex) { return new LongTimestampExtractor(columnIndex); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongTimestampExtractor.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongTimestampExtractor.java index 5f55a519e1f..92831643be9 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongTimestampExtractor.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongTimestampExtractor.java @@ -17,13 +17,15 @@ import io.confluent.ksql.GenericRow; -public class LongTimestampExtractor extends AbstractColumnTimestampExtractor { +public class LongTimestampExtractor implements KsqlTimestampExtractor { + private final int timestampColumnindex; + LongTimestampExtractor(final int timestampColumnindex) { - super(timestampColumnindex); + this.timestampColumnindex = timestampColumnindex; } @Override public long extract(final GenericRow row) { - return (long)row.get(timetampColumnIndex); + return (long)row.get(timestampColumnindex); } } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractionPolicy.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractionPolicy.java index cef9a1d553c..4dd97ea58ea 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractionPolicy.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractionPolicy.java @@ -33,8 +33,8 @@ public MetadataTimestampExtractionPolicy(final TimestampExtractor timestampExtra } @Override - public TimestampExtractor create(final int columnIndex) { - return timestampExtractor; + public KsqlTimestampExtractor create(final int columnIndex) { + return new MetadataTimestampExtractor(timestampExtractor); } @Override diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractor.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractor.java new file mode 100644 index 00000000000..bcc993f2bb6 --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractor.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams.timestamp; + +import static java.util.Objects.requireNonNull; + +import io.confluent.ksql.GenericRow; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; + +public class MetadataTimestampExtractor implements KsqlTimestampExtractor { + private final TimestampExtractor timestampExtractor; + + public MetadataTimestampExtractor(final TimestampExtractor timestampExtractor) { + this.timestampExtractor = requireNonNull(timestampExtractor, "timestampExtractor"); + } + + public TimestampExtractor getTimestampExtractor() { + return timestampExtractor; + } + + @Override + public long extract( + final ConsumerRecord record, + final long previousTimestamp + ) { + return timestampExtractor.extract(record, previousTimestamp); + } + + @Override + public long extract(final GenericRow row) { + throw new UnsupportedOperationException("Operation not supported for class: " + + timestampExtractor.getClass()); + } +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractionPolicy.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractionPolicy.java index 947dba5d357..e20681749ac 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractionPolicy.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractionPolicy.java @@ -19,7 +19,6 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.name.ColumnName; import java.util.Objects; -import org.apache.kafka.streams.processor.TimestampExtractor; @Immutable public class StringTimestampExtractionPolicy implements TimestampExtractionPolicy { @@ -38,7 +37,7 @@ public StringTimestampExtractionPolicy( } @Override - public TimestampExtractor create(final int timestampColumnIndex) { + public KsqlTimestampExtractor create(final int timestampColumnIndex) { return new StringTimestampExtractor(format, timestampColumnIndex); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractor.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractor.java index ec2ce8a542c..1eff1f3667d 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractor.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractor.java @@ -15,25 +15,27 @@ package io.confluent.ksql.execution.streams.timestamp; +import com.google.common.base.Preconditions; import io.confluent.ksql.GenericRow; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.timestamp.StringToTimestampParser; import java.util.Objects; -public class StringTimestampExtractor extends AbstractColumnTimestampExtractor { +public class StringTimestampExtractor implements KsqlTimestampExtractor { private final StringToTimestampParser timestampParser; + private final int timestampColumn; private final String format; StringTimestampExtractor(final String format, final int timestampColumn) { - super(timestampColumn); - this.format = Objects.requireNonNull(format, "format can't be null"); + Preconditions.checkArgument(timestampColumn >= 0, "timestampColumn must be >= 0"); + this.timestampColumn = timestampColumn; this.timestampParser = new StringToTimestampParser(format); } @Override public long extract(final GenericRow row) { - final String value = (String)row.get(timetampColumnIndex); + final String value = (String)row.get(timestampColumn); try { return timestampParser.parse(value); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicy.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicy.java index 3c9e4527751..4bafb69a692 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicy.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicy.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.As; import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.name.ColumnName; -import org.apache.kafka.streams.processor.TimestampExtractor; @Immutable @JsonTypeInfo( @@ -35,7 +34,7 @@ }) public interface TimestampExtractionPolicy { - TimestampExtractor create(int columnIndex); + KsqlTimestampExtractor create(int columnIndex); default ColumnName getTimestampField() { return null; diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SinkBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SinkBuilderTest.java index 6538775e096..b0d59c59d3c 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SinkBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SinkBuilderTest.java @@ -19,7 +19,7 @@ import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.plan.KeySerdeFactory; -import io.confluent.ksql.execution.streams.timestamp.AbstractColumnTimestampExtractor; +import io.confluent.ksql.execution.streams.timestamp.KsqlTimestampExtractor; import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogger; @@ -194,8 +194,8 @@ public void shouldBuildStreamWithoutTransformTimestampWhenNoTimestampIsSpecified public void shouldTransformTimestampRow() { // Given final long timestampColumnValue = 10001; - final AbstractColumnTimestampExtractor timestampExtractor - = mock(AbstractColumnTimestampExtractor.class); + final KsqlTimestampExtractor timestampExtractor + = mock(KsqlTimestampExtractor.class); when(timestampExtractor.extract(any())).thenReturn(timestampColumnValue); // When @@ -217,8 +217,8 @@ public void shouldTransformTimestampRow() { public void shouldCallProcessingLoggerOnForwardError() { // Given final long timestampColumnValue = 10001; - final AbstractColumnTimestampExtractor timestampExtractor - = mock(AbstractColumnTimestampExtractor.class); + final KsqlTimestampExtractor timestampExtractor + = mock(KsqlTimestampExtractor.class); when(timestampExtractor.extract(any())).thenReturn(timestampColumnValue); doThrow(KsqlException.class) .when(processorContext) @@ -241,8 +241,8 @@ public void shouldCallProcessingLoggerOnForwardError() { @Test public void shouldCallProcessingLoggerOnTimestampExtractorError() { // Given - final AbstractColumnTimestampExtractor timestampExtractor - = mock(AbstractColumnTimestampExtractor.class); + final KsqlTimestampExtractor timestampExtractor + = mock(KsqlTimestampExtractor.class); doThrow(KsqlException.class).when(timestampExtractor).extract(row); // When @@ -261,8 +261,8 @@ public void shouldCallProcessingLoggerOnTimestampExtractorError() { @Test(expected = NullPointerException.class) public void shouldThrowOnNullProcessorContext() { // Given - final AbstractColumnTimestampExtractor timestampExtractor - = mock(AbstractColumnTimestampExtractor.class); + final KsqlTimestampExtractor timestampExtractor + = mock(KsqlTimestampExtractor.class); // When/Then getTransformer(timestampExtractor, processingLogger).init(null); @@ -277,8 +277,8 @@ public void shouldThrowOnNullTimestampExtractor() { @Test(expected = NullPointerException.class) public void shouldThrowOnNullProcessingLogger() { // Given - final AbstractColumnTimestampExtractor timestampExtractor - = mock(AbstractColumnTimestampExtractor.class); + final KsqlTimestampExtractor timestampExtractor + = mock(KsqlTimestampExtractor.class); // When/Then getTransformer(timestampExtractor, null); @@ -298,7 +298,7 @@ private void buildDefaultSinkBuilder() { } private Transformer> getTransformer( - final AbstractColumnTimestampExtractor timestampExtractor, + final KsqlTimestampExtractor timestampExtractor, final ProcessingLogger processingLogger ) { return new SinkBuilder.TransformTimestamp(timestampExtractor, processingLogger).get(); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractorTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/KsqlTimestampExtractorTest.java similarity index 50% rename from ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractorTest.java rename to ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/KsqlTimestampExtractorTest.java index 1a2de019039..4146e5a2809 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractorTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/KsqlTimestampExtractorTest.java @@ -1,40 +1,18 @@ package io.confluent.ksql.execution.streams.timestamp; -import io.confluent.ksql.GenericRow; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Test; -import java.text.ParseException; -import java.text.SimpleDateFormat; - import static io.confluent.ksql.GenericRow.genericRow; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -public class AbstractColumnTimestampExtractorTest { - @Test(expected = IllegalArgumentException.class) - public void shouldThrowIfColumnIndexIsNegative() { - // When/Then - new AbstractColumnTimestampExtractor(-1) { - @Override - public long extract(final GenericRow row) { - return 0; - } - }; - } - +public class KsqlTimestampExtractorTest { @Test public void shouldCallExtractTimestampFromGenericRow() { // Given final long timestamp = 1526075913000L; - - final AbstractColumnTimestampExtractor timestampExtractor = - new AbstractColumnTimestampExtractor(2) { - @Override - public long extract(final GenericRow row) { - return timestamp; - } - }; + final KsqlTimestampExtractor timestampExtractor = (row -> timestamp); // When final long actualTime = timestampExtractor.extract(new ConsumerRecord<>("topic", diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractorTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractorTest.java new file mode 100644 index 00000000000..171a2d092e3 --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/MetadataTimestampExtractorTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams.timestamp; + +import io.confluent.ksql.GenericRow; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.Mockito.mock; + +@RunWith(MockitoJUnitRunner.class) +public class MetadataTimestampExtractorTest { + @Mock + private TimestampExtractor timestampExtractor; + @Mock + private ConsumerRecord record; + + @Test + public void shouldCallInternalTimestampExtractorOnExtract() { + // Given + final MetadataTimestampExtractor metadataTimestampExtractor = + new MetadataTimestampExtractor(timestampExtractor); + + // When + metadataTimestampExtractor.extract(record, 1); + + // Then + Mockito.verify(timestampExtractor, Mockito.times(1)) + .extract(record, 1); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowOnNullTimestampExtractor() { + // When/Then + new MetadataTimestampExtractor(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowUnsupportedExceptionOnExtractGenericRow() { + // when/Then + new MetadataTimestampExtractor(timestampExtractor).extract(mock(GenericRow.class)); + } +} diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicyFactoryTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicyFactoryTest.java index 1a04cea6ec6..656e7c090f0 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicyFactoryTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicyFactoryTest.java @@ -63,7 +63,8 @@ public void shouldCreateMetadataPolicyWhenTimestampFieldNotProvided() { // Then: assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class)); - assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class)); + assertThat(((MetadataTimestampExtractor)result.create(0)).getTimestampExtractor(), + instanceOf(FailOnInvalidTimestamp.class)); } @Test @@ -100,7 +101,8 @@ public void shouldCreateMetadataPolicyWithDefaultFailedOnInvalidTimestamp() { // Then: assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class)); - assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class)); + assertThat(((MetadataTimestampExtractor)result.create(0)).getTimestampExtractor(), + instanceOf(FailOnInvalidTimestamp.class)); } @Test @@ -121,7 +123,8 @@ public void shouldCreateMetadataPolicyWithConfiguredUsePartitionTimeOnInvalidTim // Then: assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class)); - assertThat(result.create(0), instanceOf(UsePartitionTimeOnInvalidTimestamp.class)); + assertThat(((MetadataTimestampExtractor)result.create(0)).getTimestampExtractor(), + instanceOf(UsePartitionTimeOnInvalidTimestamp.class)); } @Test