From d4989d2cc4bddc2174b304a11a661f212077a61f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Fri, 21 Feb 2020 13:58:04 -0600 Subject: [PATCH] fix: support string/long/any timestamp, add new QTT tests --- .../timestampformat.json | 97 ++++++++-- .../ksql/execution/streams/SinkBuilder.java | 75 ++++---- .../AbstractColumnTimestampExtractor.java | 48 +++++ .../timestamp/LongTimestampExtractor.java | 31 +--- .../timestamp/StringTimestampExtractor.java | 20 +- .../execution/streams/SinkBuilderTest.java | 112 ++++++++---- .../streams/StreamSinkBuilderTest.java | 151 --------------- .../streams/TableSinkBuilderTest.java | 173 ------------------ .../AbstractColumnTimestampExtractorTest.java | 51 ++++++ .../StringTimestampExtractorTest.java | 10 +- 10 files changed, 313 insertions(+), 455 deletions(-) create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractor.java delete mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java delete mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java create mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractorTest.java diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/timestampformat.json b/ksql-functional-tests/src/test/resources/query-validation-tests/timestampformat.json index 15a9c195999b..d298ded6fd7e 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/timestampformat.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/timestampformat.json @@ -39,36 +39,101 @@ ] }, { - "name": "KSQL override output timestamp for CSAS", + "name": "override output timestamp for CSAS", "statements": [ - "CREATE STREAM TEST (ID bigint, EVENT_TS bigint) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS');", - "CREATE STREAM TS WITH (timestamp='sink_ts') AS SELECT id as sink_ts, id, event_ts FROM test;" + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM TS WITH (timestamp='sink_ts') AS SELECT id as sink_ts, id FROM test;" ], "inputs": [ - {"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": 1526075913000}, "timestamp": 1526075913000}, - {"topic": "test_topic", "value": {"ID": 2, "EVENT_TS": -1}, "timestamp": -1}, - {"topic": "test_topic", "value": {"ID": 3, "EVENT_TS": 1589234313000}, "timestamp": 1589234313000} + {"topic": "test_topic", "value": {"ID": 1}, "timestamp": 1526075913000}, + {"topic": "test_topic", "value": {"ID": 3}, "timestamp": 1589234313000} ], "outputs": [ - {"topic": "TS", "value": {"SINK_TS":1, "ID": 1, "EVENT_TS": 1526075913000}, "timestamp": 1}, - {"topic": "TS", "value": {"SINK_TS":3, "ID": 3, "EVENT_TS": 1589234313000}, "timestamp": 3} + {"topic": "TS", "value": {"SINK_TS":1, "ID": 1}, "timestamp": 1}, + {"topic": "TS", "value": {"SINK_TS":3, "ID": 3}, "timestamp": 3} ] }, { - "name": "KSQL override output timestamp for CTAS", + "name": "override output timestamp for CSAS using a string TIMESTAMP_FORMAT", "statements": [ - "CREATE TABLE TEST (ID bigint, EVENT_TS bigint) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS');", - "CREATE TABLE TS WITH (timestamp='sink_ts') AS SELECT id as sink_ts, id, event_ts FROM test;" + "CREATE STREAM TEST (ID bigint, EVENT_TS varchar) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX');", + "CREATE STREAM TS WITH (timestamp='event_ts', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX') AS SELECT id, event_ts FROM test;" ], "inputs": [ - {"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": 1526075913000}, "timestamp": 1526075913000}, - {"topic": "test_topic", "value": {"ID": 2, "EVENT_TS": -1}, "timestamp": -1}, - {"topic": "test_topic", "value": {"ID": 3, "EVENT_TS": 1589234313000}, "timestamp": 1589234313000} + {"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": "2018-05-11T21:58:33Z"}, "timestamp": 10}, + {"topic": "test_topic", "value": {"ID": 3, "EVENT_TS": "2019-05-11T21:58:33Z"}, "timestamp": 10} ], "outputs": [ - {"topic": "TS", "value": {"SINK_TS":1, "ID": 1, "EVENT_TS": 1526075913000}, "timestamp": 1}, - {"topic": "TS", "value": {"SINK_TS":3, "ID": 3, "EVENT_TS": 1589234313000}, "timestamp": 3} + {"topic": "TS", "value": {"ID": 1, "EVENT_TS": "2018-05-11T21:58:33Z"}, "timestamp": 1526075913000}, + {"topic": "TS", "value": {"ID": 3, "EVENT_TS": "2019-05-11T21:58:33Z"}, "timestamp": 1557611913000} ] + }, + { + "name": "override output timestamp for CTAS", + "statements": [ + "CREATE TABLE TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE TS WITH (timestamp='sink_ts') AS SELECT id as sink_ts, id FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"ID": 1}, "timestamp": 1526075913000}, + {"topic": "test_topic", "value": {"ID": 3}, "timestamp": 1589234313000} + ], + "outputs": [ + {"topic": "TS", "value": {"SINK_TS":1, "ID": 1}, "timestamp": 1}, + {"topic": "TS", "value": {"SINK_TS":3, "ID": 3}, "timestamp": 3} + ] + }, + { + "name": "override output timestamp for CTAS using a string TIMESTAMP_FORMAT", + "statements": [ + "CREATE STREAM TEST (ID bigint, EVENT_TS varchar) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX');", + "CREATE STREAM TS WITH (timestamp='event_ts', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX') AS SELECT id, event_ts FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": "2018-05-11T21:58:33Z"}, "timestamp": 10}, + {"topic": "test_topic", "value": {"ID": 3, "EVENT_TS": "2019-05-11T21:58:33Z"}, "timestamp": 10} + ], + "outputs": [ + {"topic": "TS", "value": {"ID": 1, "EVENT_TS": "2018-05-11T21:58:33Z"}, "timestamp": 1526075913000}, + {"topic": "TS", "value": {"ID": 3, "EVENT_TS": "2019-05-11T21:58:33Z"}, "timestamp": 1557611913000} + ] + }, + { + "name": "timestamp column of source should not influence sink", + "statements": [ + "CREATE STREAM INPUT (ID bigint, EVENT_TS bigint) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS');", + "CREATE STREAM OUTPUT AS SELECT id as EVENT_TS FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": 1526075913000}}, + {"topic": "test_topic", "value": {"ID": 2, "EVENT_TS": 1589234313000}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"EVENT_TS":1}, "timestamp": 1526075913000}, + {"topic": "OUTPUT", "value": {"EVENT_TS":2}, "timestamp": 1589234313000} + ] + }, + { + "name": "timestamp with column that does not exist should throw exception", + "statements": [ + "CREATE STREAM TEST (ID bigint, EVENT_TS bigint) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM TS WITH (timestamp='invalid_ts') AS SELECT id, event_ts FROM test;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "The TIMESTAMP column set in the WITH clause does not exist in the schema: 'INVALID_TS'" + } + }, + { + "name": "timestamp column with invalid data type should throw exception", + "statements": [ + "CREATE STREAM TEST (ID bigint, EVENT_TS int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM TS WITH (timestamp='event_ts') AS SELECT id, event_ts FROM test;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "Timestamp column, `EVENT_TS`, should be LONG(INT64) or a String with a timestamp_format specified" + } } ] } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java index ae345841111e..e5bb9545e434 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java @@ -21,12 +21,14 @@ import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.plan.KeySerdeFactory; +import io.confluent.ksql.execution.streams.timestamp.AbstractColumnTimestampExtractor; +import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicy; +import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory; import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; - -import java.util.Objects; +import io.confluent.ksql.util.KsqlConfig; import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; @@ -65,11 +67,11 @@ public static void build( queryContext ); - final Optional> tsTransformer = timestampColumn - .map(TimestampColumn::getColumn) - .map(c -> schema.findValueColumn(c).orElseThrow(IllegalStateException::new)) - .map(Column::index) - .map(TransformTimestamp::new); + final Optional> tsTransformer = timestampTransformer( + queryBuilder.getKsqlConfig(), + schema, + timestampColumn + ); final KStream transformed = tsTransformer .map(t -> stream.transform(t)) @@ -78,27 +80,36 @@ public static void build( transformed.to(topicName, Produced.with(keySerde, valueSerde)); } - static class TransformTimestamp - implements TransformerSupplier> { - private final int timestampColumnIndex; - - TransformTimestamp(final int timestampColumnIndex) { - this.timestampColumnIndex = requireNonNull(timestampColumnIndex, "timestampColumnIndex"); + private static Optional> timestampTransformer( + final KsqlConfig ksqlConfig, + final LogicalSchema sourceSchema, + final Optional timestampColumn + ) { + if (!timestampColumn.isPresent()) { + return Optional.empty(); } - @Override - public boolean equals(final Object o) { - if (o == null || !(o instanceof TransformTimestamp)) { - return false; - } + final TimestampExtractionPolicy timestampPolicy = TimestampExtractionPolicyFactory.create( + ksqlConfig, + sourceSchema, + timestampColumn + ); - final TransformTimestamp that = (TransformTimestamp)o; - return timestampColumnIndex == that.timestampColumnIndex; - } + return timestampColumn + .map(TimestampColumn::getColumn) + .map(c -> sourceSchema.findValueColumn(c).orElseThrow(IllegalStateException::new)) + .map(Column::index) + .map(timestampPolicy::create) + .filter(te -> te instanceof AbstractColumnTimestampExtractor) + .map(te -> new TransformTimestamp<>((AbstractColumnTimestampExtractor)te)); + } - @Override - public int hashCode() { - return Objects.hashCode(timestampColumnIndex); + static class TransformTimestamp + implements TransformerSupplier> { + final AbstractColumnTimestampExtractor timestampExtractor; + + TransformTimestamp(final AbstractColumnTimestampExtractor timestampExtractor) { + this.timestampExtractor = requireNonNull(timestampExtractor, "timestampExtractor"); } @Override @@ -113,17 +124,13 @@ public void init(final ProcessorContext processorContext) { @Override public KeyValue transform(final K key, final GenericRow row) { - if (row.get(timestampColumnIndex) instanceof Long) { - processorContext.forward( - key, - row, - To.all().withTimestamp((long) row.get(timestampColumnIndex)) - ); - - return null; - } + processorContext.forward( + key, + row, + To.all().withTimestamp(timestampExtractor.extract(row)) + ); - return KeyValue.pair(key, row); + return null; } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractor.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractor.java new file mode 100644 index 000000000000..35890837806b --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractor.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams.timestamp; + +import com.google.common.base.Preconditions; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.util.KsqlException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; + +public abstract class AbstractColumnTimestampExtractor implements TimestampExtractor { + protected final int timetampColumnIndex; + + AbstractColumnTimestampExtractor(final int timestampColumnindex) { + Preconditions.checkArgument(timestampColumnindex >= 0, "timestampColumnindex must be >= 0"); + this.timetampColumnIndex = timestampColumnindex; + } + + @Override + public long extract(final ConsumerRecord record, final long previousTimestamp) { + if (record.value() instanceof GenericRow) { + try { + return extract((GenericRow) record.value()); + } catch (final Exception e) { + throw new KsqlException("Unable to extract timestamp from record." + + " record=" + record, + e); + } + } + + return 0; + } + + public abstract long extract(GenericRow row); +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongTimestampExtractor.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongTimestampExtractor.java index c9fb644eb7c6..5f55a519e1f5 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongTimestampExtractor.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LongTimestampExtractor.java @@ -16,37 +16,14 @@ package io.confluent.ksql.execution.streams.timestamp; import io.confluent.ksql.GenericRow; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.TimestampExtractor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class LongTimestampExtractor implements TimestampExtractor { - - private static final Logger log = LoggerFactory.getLogger(LongTimestampExtractor.class); - private final int timestampColumnindex; +public class LongTimestampExtractor extends AbstractColumnTimestampExtractor { LongTimestampExtractor(final int timestampColumnindex) { - this.timestampColumnindex = timestampColumnindex; + super(timestampColumnindex); } @Override - public long extract(final ConsumerRecord consumerRecord, final long l) { - if (timestampColumnindex < 0) { - return 0; - } else { - try { - if (consumerRecord.value() instanceof GenericRow) { - final GenericRow genericRow = (GenericRow) consumerRecord.value(); - if (genericRow.get(timestampColumnindex) instanceof Long) { - return (long) genericRow.get(timestampColumnindex); - } - } - } catch (final Exception e) { - log.error("Exception in extracting timestamp for row: " + consumerRecord.value(), e); - } - } - return 0; + public long extract(final GenericRow row) { + return (long)row.get(timetampColumnIndex); } } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractor.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractor.java index 3e7f66fa8481..ec2ce8a542c0 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractor.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractor.java @@ -15,36 +15,30 @@ package io.confluent.ksql.execution.streams.timestamp; -import com.google.common.base.Preconditions; import io.confluent.ksql.GenericRow; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.timestamp.StringToTimestampParser; import java.util.Objects; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.TimestampExtractor; -public class StringTimestampExtractor implements TimestampExtractor { +public class StringTimestampExtractor extends AbstractColumnTimestampExtractor { private final StringToTimestampParser timestampParser; - private final int timestampColumn; private final String format; StringTimestampExtractor(final String format, final int timestampColumn) { + super(timestampColumn); + this.format = Objects.requireNonNull(format, "format can't be null"); - Preconditions.checkArgument(timestampColumn >= 0, "timestampColumn must be >= 0"); - this.timestampColumn = timestampColumn; this.timestampParser = new StringToTimestampParser(format); } @Override - public long extract(final ConsumerRecord consumerRecord, - final long previousTimestamp) { - final GenericRow row = (GenericRow) consumerRecord.value(); - final String value = (String)row.get(timestampColumn); + public long extract(final GenericRow row) { + final String value = (String)row.get(timetampColumnIndex); + try { return timestampParser.parse(value); } catch (final KsqlException e) { - throw new KsqlException("Unable to parse string timestamp from record." - + " record=" + consumerRecord + throw new KsqlException("Unable to parse string timestamp." + " timestamp=" + value + " timestamp_format=" + format, e); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SinkBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SinkBuilderTest.java index 51c31f81a41d..b521ec312faf 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SinkBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SinkBuilderTest.java @@ -19,9 +19,11 @@ import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.plan.KeySerdeFactory; +import io.confluent.ksql.execution.streams.timestamp.AbstractColumnTimestampExtractor; import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.FormatInfo; @@ -30,6 +32,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; @@ -45,19 +48,14 @@ import java.util.Optional; -import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -71,6 +69,8 @@ public class SinkBuilderTest { private static final FormatInfo KEY_FORMAT = FormatInfo.of(FormatFactory.KAFKA.name()); private static final FormatInfo VALUE_FORMAT = FormatInfo.of(FormatFactory.JSON.name()); + private static final PhysicalSchema PHYSICAL_SCHEMA = + PhysicalSchema.from(SCHEMA.withoutMetaAndKeyColsInValue(), SerdeOption.none()); @Mock private KsqlQueryBuilder queryBuilder; @@ -96,13 +96,53 @@ public void setup() { doReturn(kStream).when(kStream).transform(any()); } + @Test + public void shouldBuildKeySerdeCorrectly() { + // Given/When + buildDefaultSinkBuilder(); + + // Then: + verify(keySerdeFactory).buildKeySerde(KEY_FORMAT, PHYSICAL_SCHEMA, queryContext); + } + + @Test + public void shouldBuildValueSerdeCorrectly() { + // Given/When + buildDefaultSinkBuilder(); + + // Then: + verify(queryBuilder).buildValueSerde( + VALUE_FORMAT, + PHYSICAL_SCHEMA, + queryContext + ); + } + + @Test + public void shouldWriteOutStreamWithCorrectSerdes() { + // Given/When + buildDefaultSinkBuilder(); + + // Then + verify(kStream).to(anyString(), eq(Produced.with(keySerde, valSerde))); + } + + @Test + public void shouldWriteOutStreamToCorrectTopic() { + // Given/When + buildDefaultSinkBuilder(); + + // Then + verify(kStream).to(eq(TOPIC), any()); + } + @Test public void shouldBuildStreamUsingTransformTimestampWhenTimestampIsSpecified() { // Given/When SinkBuilder.build( SCHEMA, Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), - Optional.of(new TimestampColumn(ColumnName.of("GREEN"), Optional.empty())), + Optional.of(new TimestampColumn(ColumnName.of("BLUE"), Optional.empty())), TOPIC, kStream, keySerdeFactory, @@ -112,7 +152,7 @@ public void shouldBuildStreamUsingTransformTimestampWhenTimestampIsSpecified() { // Then final InOrder inOrder = Mockito.inOrder(kStream); - inOrder.verify(kStream).transform(new SinkBuilder.TransformTimestamp<>(1)); + inOrder.verify(kStream).transform(any()); inOrder.verify(kStream).to(anyString(), any()); inOrder.verifyNoMoreInteractions(); } @@ -120,16 +160,7 @@ public void shouldBuildStreamUsingTransformTimestampWhenTimestampIsSpecified() { @Test public void shouldBuildStreamWithoutTransformTimestampWhenNoTimestampIsSpecified() { // Given/When - SinkBuilder.build( - SCHEMA, - Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), - Optional.empty(), - TOPIC, - kStream, - keySerdeFactory, - queryContext, - queryBuilder - ); + buildDefaultSinkBuilder(); // Then final InOrder inOrder = Mockito.inOrder(kStream); @@ -141,38 +172,53 @@ public void shouldBuildStreamWithoutTransformTimestampWhenNoTimestampIsSpecified public void shouldTransformTimestampRow() { // Given final long timestampColumnValue = 10001; - final int timestampColumnIndex = 2; final ProcessorContext context = mock(ProcessorContext.class); - when(row.get(timestampColumnIndex)).thenReturn(timestampColumnValue); + final AbstractColumnTimestampExtractor timestampExtractor + = mock(AbstractColumnTimestampExtractor.class); + when(timestampExtractor.extract(any())).thenReturn(timestampColumnValue); // When final Transformer> transformer = - new SinkBuilder.TransformTimestamp(timestampColumnIndex).get(); + new SinkBuilder.TransformTimestamp(timestampExtractor).get(); transformer.init(context); final KeyValue kv = transformer.transform("key", row); // Then assertNull(kv); + verify(timestampExtractor).extract(row); verify(context, Mockito.times(1)) .forward(eq("key"), eq(row), toCaptor.capture()); assertTrue(toCaptor.getValue().equals(To.all().withTimestamp(timestampColumnValue))); } - @Test - public void shouldNotTransformTimestampIfTimestampIndexNotProvided() { + @Test(expected = NullPointerException.class) + public void shouldThrowIfNegativeProcessorContext() { // Given - final int timestampColumnIndex = -1; - final ProcessorContext context = mock(ProcessorContext.class); - final GenericRow row = mock(GenericRow.class); + final AbstractColumnTimestampExtractor timestampExtractor + = mock(AbstractColumnTimestampExtractor.class); - // When - final Transformer> transformer = - new SinkBuilder.TransformTimestamp(timestampColumnIndex).get(); - transformer.init(context); - final KeyValue kv = transformer.transform("key", row); + // When/Then + new SinkBuilder.TransformTimestamp(timestampExtractor) + .get() + .init(null); + } - // Then - assertThat(kv, is(KeyValue.pair("key", row))); - verifyZeroInteractions(context); + @Test(expected = NullPointerException.class) + public void shouldThrowIfNegativeTimestampExtractor() { + // When/Then + new SinkBuilder.TransformTimestamp(null); + } + + private void buildDefaultSinkBuilder() { + SinkBuilder.build( + SCHEMA, + Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), + Optional.empty(), + TOPIC, + kStream, + keySerdeFactory, + queryContext, + queryBuilder + ); } } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java deleted file mode 100644 index 321ba3bd4325..000000000000 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.execution.streams; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.execution.builder.KsqlQueryBuilder; -import io.confluent.ksql.execution.context.QueryContext; -import io.confluent.ksql.execution.plan.ExecutionStep; -import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; -import io.confluent.ksql.execution.plan.KStreamHolder; -import io.confluent.ksql.execution.plan.KeySerdeFactory; -import io.confluent.ksql.execution.plan.PlanBuilder; -import io.confluent.ksql.execution.plan.StreamSink; -import io.confluent.ksql.name.ColumnName; -import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.schema.ksql.types.SqlTypes; -import io.confluent.ksql.serde.FormatFactory; -import io.confluent.ksql.serde.FormatInfo; -import io.confluent.ksql.serde.SerdeOption; -import java.util.Optional; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class StreamSinkBuilderTest { - - private static final String TOPIC = "TOPIC"; - private static final LogicalSchema SCHEMA = LogicalSchema.builder() - .valueColumn(ColumnName.of("BLUE"), SqlTypes.BIGINT) - .valueColumn(ColumnName.of("GREEN"), SqlTypes.STRING) - .build(); - - private static final PhysicalSchema PHYSICAL_SCHEMA = - PhysicalSchema.from(SCHEMA.withoutMetaAndKeyColsInValue(), SerdeOption.none()); - private static final FormatInfo KEY_FORMAT = FormatInfo.of(FormatFactory.KAFKA.name()); - private static final FormatInfo VALUE_FORMAT = FormatInfo.of(FormatFactory.JSON.name()); - - @Mock - private KsqlQueryBuilder queryBuilder; - @Mock - private KeySerdeFactory keySerdeFactory; - @Mock - private KStream kStream; - @Mock - private ExecutionStep> source; - @Mock - private Serde keySerde; - @Mock - private Serde valSerde; - @Captor - private ArgumentCaptor> mapperCaptor; - @Mock - private QueryContext queryContext; - - private PlanBuilder planBuilder; - private StreamSink sink; - - @Before - public void setup() { - when(keySerdeFactory.buildKeySerde(any(), any(), any())).thenReturn(keySerde); - when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valSerde); - when(source.build(any())).thenReturn(new KStreamHolder<>(kStream, SCHEMA, keySerdeFactory)); - doReturn(kStream).when(kStream).transform(any()); - - sink = new StreamSink<>( - new ExecutionStepPropertiesV1(queryContext), - source, - io.confluent.ksql.execution.plan.Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), - TOPIC, - Optional.empty() - ); - planBuilder = new KSPlanBuilder( - queryBuilder, - mock(SqlPredicateFactory.class), - mock(AggregateParamsFactory.class), - mock(StreamsFactories.class) - ); - } - - @Test - public void shouldWriteOutStreamToCorrectTopic() { - // When: - sink.build(planBuilder); - - // Then: - verify(kStream).to(eq(TOPIC), any()); - } - - @Test - public void shouldBuildKeySerdeCorrectly() { - // When: - sink.build(planBuilder); - - // Then: - verify(keySerdeFactory).buildKeySerde(KEY_FORMAT, PHYSICAL_SCHEMA, queryContext); - } - - @Test - public void shouldBuildValueSerdeCorrectly() { - // When: - sink.build(planBuilder); - - // Then: - verify(queryBuilder).buildValueSerde( - VALUE_FORMAT, - PHYSICAL_SCHEMA, - queryContext - ); - } - - @Test - public void shouldWriteOutStreamWithCorrectSerdes() { - // When: - sink.build(planBuilder); - - // Then: - verify(kStream).to(anyString(), eq(Produced.with(keySerde, valSerde))); - } -} diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java deleted file mode 100644 index 4f8fc8844650..000000000000 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.execution.streams; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.execution.builder.KsqlQueryBuilder; -import io.confluent.ksql.execution.context.QueryContext; -import io.confluent.ksql.execution.plan.ExecutionStep; -import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; -import io.confluent.ksql.execution.plan.KTableHolder; -import io.confluent.ksql.execution.plan.KeySerdeFactory; -import io.confluent.ksql.execution.plan.PlanBuilder; -import io.confluent.ksql.execution.plan.TableSink; -import io.confluent.ksql.name.ColumnName; -import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.schema.ksql.types.SqlTypes; -import io.confluent.ksql.serde.FormatFactory; -import io.confluent.ksql.serde.FormatInfo; -import io.confluent.ksql.serde.SerdeOption; -import java.util.Optional; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class TableSinkBuilderTest { - - private static final String TOPIC = "TOPIC"; - private static final LogicalSchema SCHEMA = LogicalSchema.builder() - .valueColumn(ColumnName.of("BLUE"), SqlTypes.BIGINT) - .valueColumn(ColumnName.of("GREEN"), SqlTypes.STRING) - .build(); - - private static final PhysicalSchema PHYSICAL_SCHEMA = - PhysicalSchema.from(SCHEMA.withoutMetaAndKeyColsInValue(), SerdeOption.none()); - private static final FormatInfo KEY_FORMAT = FormatInfo.of(FormatFactory.KAFKA.name()); - private static final FormatInfo VALUE_FORMAT = FormatInfo.of(FormatFactory.JSON.name()); - - @Mock - private KsqlQueryBuilder queryBuilder; - @Mock - private KeySerdeFactory keySerdeFactory; - @Mock - private KTable kTable; - @Mock - private KStream kStream; - @Mock - private ExecutionStep> source; - @Mock - private Serde keySerde; - @Mock - private Serde valSerde; - @Captor - private ArgumentCaptor> mapperCaptor; - - private final QueryContext queryContext = - new QueryContext.Stacker().push("sink").getQueryContext(); - - private PlanBuilder planBuilder; - private TableSink sink; - - @Before - public void setup() { - when(keySerdeFactory.buildKeySerde(any(), any(), any())).thenReturn(keySerde); - when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valSerde); - when(kTable.toStream()).thenReturn(kStream); - when(source.build(any())).thenReturn( - KTableHolder.unmaterialized(kTable, SCHEMA, keySerdeFactory)); - doReturn(kStream).when(kStream).transform(any()); - - sink = new TableSink<>( - new ExecutionStepPropertiesV1(queryContext), - source, - io.confluent.ksql.execution.plan.Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), - TOPIC, - Optional.empty() - ); - planBuilder = new KSPlanBuilder( - queryBuilder, - mock(SqlPredicateFactory.class), - mock(AggregateParamsFactory.class), - mock(StreamsFactories.class) - ); - } - - @Test - public void shouldWriteOutTable() { - // When: - sink.build(planBuilder); - - // Then: - final InOrder inOrder = Mockito.inOrder(kTable, kStream); - inOrder.verify(kTable).toStream(); - inOrder.verify(kStream).transform(any()); - inOrder.verify(kStream).to(anyString(), any()); - verifyNoMoreInteractions(kStream); - } - - @Test - public void shouldWriteOutTableToCorrectTopic() { - // When: - sink.build(planBuilder); - - // Then: - verify(kStream).to(eq(TOPIC), any()); - } - - @Test - public void shouldBuildKeySerdeCorrectly() { - // When: - sink.build(planBuilder); - - // Then: - verify(keySerdeFactory).buildKeySerde(KEY_FORMAT, PHYSICAL_SCHEMA, queryContext); - } - - @Test - public void shouldBuildValueSerdeCorrectly() { - // When: - sink.build(planBuilder); - - // Then: - verify(queryBuilder).buildValueSerde( - VALUE_FORMAT, - PHYSICAL_SCHEMA, - queryContext - ); - } - - @Test - public void shouldWriteOutTableWithCorrectSerdes() { - // When: - sink.build(planBuilder); - - // Then: - verify(kStream).to(anyString(), eq(Produced.with(keySerde, valSerde))); - } -} diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractorTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractorTest.java new file mode 100644 index 000000000000..1a2de019039a --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/AbstractColumnTimestampExtractorTest.java @@ -0,0 +1,51 @@ +package io.confluent.ksql.execution.streams.timestamp; + +import io.confluent.ksql.GenericRow; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Test; + +import java.text.ParseException; +import java.text.SimpleDateFormat; + +import static io.confluent.ksql.GenericRow.genericRow; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class AbstractColumnTimestampExtractorTest { + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfColumnIndexIsNegative() { + // When/Then + new AbstractColumnTimestampExtractor(-1) { + @Override + public long extract(final GenericRow row) { + return 0; + } + }; + } + + @Test + public void shouldCallExtractTimestampFromGenericRow() { + // Given + final long timestamp = 1526075913000L; + + final AbstractColumnTimestampExtractor timestampExtractor = + new AbstractColumnTimestampExtractor(2) { + @Override + public long extract(final GenericRow row) { + return timestamp; + } + }; + + // When + final long actualTime = timestampExtractor.extract(new ConsumerRecord<>("topic", + 1, + 1, + null, + genericRow(0)), + 1 + ); + + // Then + assertThat(actualTime, equalTo(timestamp)); + } +} diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractorTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractorTest.java index 168e57a6a5a6..40ce12554b97 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractorTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/StringTimestampExtractorTest.java @@ -34,13 +34,7 @@ public void shouldExtractTimestampFromStringWithFormat() throws ParseException { final String stringTime = "2010-Jan-11"; final long expectedTime = new SimpleDateFormat(format).parse(stringTime).getTime(); - final long actualTime = timestampExtractor.extract(new ConsumerRecord<>("topic", - 1, - 1, - null, - genericRow(stringTime)), - 1 - ); + final long actualTime = timestampExtractor.extract(genericRow(stringTime)); assertThat(actualTime, equalTo(expectedTime)); } @@ -52,6 +46,6 @@ public void shouldThrowIfColumnIndexIsNegative() { @Test(expected = NullPointerException.class) public void shouldThrowOnNullFormat() { - new StringTimestampExtractor(null, -1); + new StringTimestampExtractor(null, 0); } } \ No newline at end of file