Skip to content

Commit

Permalink
fix: csas/ctas with timestamp column is used for output rowtime (#4489)
Browse files Browse the repository at this point in the history
BREAKING-CHANGE:
This change is breaking the behavior of the CSAS/CTAS by transforming the ROWTIME of the output/sink topic to the one specified in the WITH TIMESTAMP property. Previously, the ROWTIME was derived from the input/source topics.
  • Loading branch information
spena authored Feb 26, 2020
1 parent a6e550b commit ddddf92
Show file tree
Hide file tree
Showing 109 changed files with 7,752 additions and 785 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
getKsqlTopic().getKafkaTopicName(),
getKsqlTopic().getValueFormat(),
serdeOptions,
contextStacker
contextStacker,
getTimestampColumn()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.confluent.ksql.execution.plan.StreamTableJoin;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.StepSchemaResolver;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.ColumnName;
Expand Down Expand Up @@ -97,13 +98,15 @@ public SchemaKStream<K> into(
final String kafkaTopicName,
final ValueFormat valueFormat,
final Set<SerdeOption> options,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final Optional<TimestampColumn> timestampColumn
) {
final StreamSink<K> step = ExecutionStepFactory.streamSink(
contextStacker,
Formats.of(keyFormat, valueFormat, options),
sourceStep,
kafkaTopicName
kafkaTopicName,
timestampColumn
);
return new SchemaKStream<>(
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.execution.plan.TableSink;
import io.confluent.ksql.execution.plan.TableTableJoin;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.ColumnName;
Expand Down Expand Up @@ -73,13 +74,15 @@ public SchemaKTable<K> into(
final String kafkaTopicName,
final ValueFormat valueFormat,
final Set<SerdeOption> options,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final Optional<TimestampColumn> timestampColumn
) {
final TableSink<K> step = ExecutionStepFactory.tableSink(
contextStacker,
sourceTableStep,
Formats.of(keyFormat, valueFormat, options),
kafkaTopicName
kafkaTopicName,
timestampColumn
);
return new SchemaKTable<>(
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,8 @@ public void shouldUseTimestampExtractedFromDDLStatement() throws Exception {
final List<ConsumerRecord<byte[], byte[]>> records =
TEST_HARNESS.verifyAvailableRecords(resultStream.toUpperCase(), 1);

final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
final long timestamp = records.get(0).timestamp();
assertThat(timestamp, equalTo(dateFormat.parse("2018-01-04").getTime()));
assertThat(timestamp, is(4L));
}

private void testTimestampColumnSelection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void before() {
when(sourceNode.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM);
when(sourceNode.buildStream(ksqlStreamBuilder)).thenReturn((SchemaKStream) sourceStream);

when(sourceStream.into(any(), any(), any(), any()))
when(sourceStream.into(any(), any(), any(), any(), any()))
.thenReturn((SchemaKStream) sinkStream);

when(ksqlStreamBuilder.buildNodeContext(any())).thenAnswer(inv ->
Expand Down Expand Up @@ -189,7 +189,7 @@ public void shouldBuildOutputNodeForInsertIntoAvroFromNonAvro() {
outputNode.buildStream(ksqlStreamBuilder);

// Then:
verify(sourceStream).into(any(), eq(valueFormat), any(), any());
verify(sourceStream).into(any(), eq(valueFormat), any(), any(), any());
}

@Test
Expand All @@ -202,7 +202,8 @@ public void shouldCallInto() {
eq(SINK_KAFKA_TOPIC_NAME),
eq(JSON_FORMAT),
eq(SerdeOption.none()),
stackerCaptor.capture()
stackerCaptor.capture(),
eq(outputNode.getTimestampColumn())
);
assertThat(
stackerCaptor.getValue().getQueryContext().getContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,33 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.timestamp.TimestampColumn;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

@Immutable
public class StreamSink<K> implements ExecutionStep<KStreamHolder<K>> {
private final ExecutionStepPropertiesV1 properties;
private final ExecutionStep<KStreamHolder<K>> source;
private final Formats formats;
private final String topicName;
private final Optional<TimestampColumn> timestampColumn;

public StreamSink(
@JsonProperty(value = "properties", required = true) final ExecutionStepPropertiesV1 props,
@JsonProperty(value = "source", required = true) final ExecutionStep<KStreamHolder<K>> source,
@JsonProperty(value = "formats", required = true) final Formats formats,
@JsonProperty(value = "topicName", required = true) final String topicName) {
@JsonProperty(value = "topicName", required = true) final String topicName,
@JsonProperty(value = "timestampColumn") final Optional<TimestampColumn> timestampColumn
) {
this.properties = Objects.requireNonNull(props, "props");
this.formats = Objects.requireNonNull(formats, "formats");
this.source = Objects.requireNonNull(source, "source");
this.topicName = Objects.requireNonNull(topicName, "topicName");
this.timestampColumn = Objects.requireNonNull(timestampColumn, "timestampColumn");
}

public String getTopicName() {
Expand All @@ -60,6 +67,10 @@ public ExecutionStep<KStreamHolder<K>> getSource() {
return source;
}

public Optional<TimestampColumn> getTimestampColumn() {
return timestampColumn;
}

@Override
public KStreamHolder<K> build(final PlanBuilder builder) {
return builder.visitStreamSink(this);
Expand All @@ -77,12 +88,13 @@ public boolean equals(final Object o) {
return Objects.equals(properties, that.properties)
&& Objects.equals(source, that.source)
&& Objects.equals(formats, that.formats)
&& Objects.equals(topicName, that.topicName);
&& Objects.equals(topicName, that.topicName)
&& Objects.equals(timestampColumn, that.timestampColumn);
}

@Override
public int hashCode() {

return Objects.hash(properties, source, formats, topicName);
return Objects.hash(properties, source, formats, topicName, timestampColumn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,33 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.timestamp.TimestampColumn;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

@Immutable
public class TableSink<K> implements ExecutionStep<KTableHolder<K>> {
private final ExecutionStepPropertiesV1 properties;
private final ExecutionStep<KTableHolder<K>> source;
private final Formats formats;
private final String topicName;
private final Optional<TimestampColumn> timestampColumn;

public TableSink(
@JsonProperty(value = "properties", required = true) final ExecutionStepPropertiesV1 props,
@JsonProperty(value = "source", required = true) final ExecutionStep<KTableHolder<K>> source,
@JsonProperty(value = "formats", required = true) final Formats formats,
@JsonProperty(value = "topicName", required = true) final String topicName
@JsonProperty(value = "topicName", required = true) final String topicName,
@JsonProperty(value = "timestampColumn") final Optional<TimestampColumn> timestampColumn
) {
this.properties = Objects.requireNonNull(props, "props");
this.source = Objects.requireNonNull(source, "source");
this.formats = Objects.requireNonNull(formats, "formats");
this.topicName = Objects.requireNonNull(topicName, "topicName");
this.timestampColumn = Objects.requireNonNull(timestampColumn, "timestampColumn");
}

@Override
Expand All @@ -63,6 +69,10 @@ public ExecutionStep<KTableHolder<K>> getSource() {
return source;
}

public Optional<TimestampColumn> getTimestampColumn() {
return timestampColumn;
}

@Override
public KTableHolder<K> build(final PlanBuilder builder) {
return builder.visitTableSink(this);
Expand All @@ -80,12 +90,13 @@ public boolean equals(final Object o) {
return Objects.equals(properties, tableSink.properties)
&& Objects.equals(source, tableSink.source)
&& Objects.equals(formats, tableSink.formats)
&& Objects.equals(topicName, tableSink.topicName);
&& Objects.equals(topicName, tableSink.topicName)
&& Objects.equals(timestampColumn, tableSink.timestampColumn);
}

@Override
public int hashCode() {

return Objects.hash(properties, source, formats, topicName);
return Objects.hash(properties, source, formats, topicName, timestampColumn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package io.confluent.ksql.execution.plan;

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.name.ColumnName;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Optional;

@RunWith(MockitoJUnitRunner.class)
public class StreamSinkTest {
@Mock
Expand All @@ -40,11 +44,13 @@ public class StreamSinkTest {
public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
new StreamSink<>(properties1, source1, formats1, "topic1"),
new StreamSink<>(properties1, source1, formats1, "topic1"))
.addEqualityGroup(new StreamSink<>(properties2, source1, formats1, "topic1"))
.addEqualityGroup(new StreamSink<>(properties1, source2, formats1, "topic1"))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats2, "topic1"))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats1, "topic2"));
new StreamSink<>(properties1, source1, formats1, "topic1", Optional.empty()),
new StreamSink<>(properties1, source1, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties2, source1, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties1, source2, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats2, "topic1", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats1, "topic2", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats1, "topic1",
Optional.of(new TimestampColumn(ColumnName.of("c1"), Optional.of("BIGINT")))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package io.confluent.ksql.execution.plan;

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.name.ColumnName;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Optional;

@RunWith(MockitoJUnitRunner.class)
public class TableSinkTest {
@Mock
Expand All @@ -40,11 +44,13 @@ public class TableSinkTest {
public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
new TableSink<>(properties1, source1, formats1, "topic1"),
new TableSink<>(properties1, source1, formats1, "topic1"))
.addEqualityGroup(new TableSink<>(properties2, source1, formats1, "topic1"))
.addEqualityGroup(new TableSink<>(properties1, source2, formats1, "topic1"))
.addEqualityGroup(new TableSink<>(properties1, source1, formats2, "topic1"))
.addEqualityGroup(new TableSink<>(properties1, source1, formats1, "topic2"));
new TableSink<>(properties1, source1, formats1, "topic1", Optional.empty()),
new TableSink<>(properties1, source1, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties2, source1, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties1, source2, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties1, source1, formats2, "topic1", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties1, source1, formats1, "topic2", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties1, source1, formats1, "topic1",
Optional.of(new TimestampColumn(ColumnName.of("c1"), Optional.of("BIGINT")))));;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ Topologies:
--> Project
<-- Join-this-join, Join-other-join
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000012
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Join-merge
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000012
<-- Project
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ Topologies:
--> Project
<-- Join-this-join, Join-other-join
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000012
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Join-merge
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000012
<-- Project
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,28 @@ Topologies:
Processor: KSTREAM-TRANSFORMVALUES-0000000005 (stores: [])
--> PrependAliasLeft
<-- KSTREAM-SOURCE-0000000004
Source: KSTREAM-SOURCE-0000000000 (topics: [t1])
--> KTABLE-SOURCE-0000000001
Processor: PrependAliasLeft (stores: [])
--> Join
<-- KSTREAM-TRANSFORMVALUES-0000000005
Processor: Join (stores: [KafkaTopic_Right-Reduce])
--> Project
<-- PrependAliasLeft
Source: KSTREAM-SOURCE-0000000000 (topics: [t1])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [KafkaTopic_Right-Reduce])
--> KTABLE-TRANSFORMVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> ApplyTimestampTransform-S1_JOIN_T1
<-- Join
Processor: ApplyTimestampTransform-S1_JOIN_T1 (stores: [])
--> KSTREAM-SINK-0000000009
<-- Project
Processor: KTABLE-TRANSFORMVALUES-0000000002 (stores: [])
--> PrependAliasRight
<-- KTABLE-SOURCE-0000000001
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000009
<-- Join
Sink: KSTREAM-SINK-0000000009 (topic: S1_JOIN_T1)
<-- Project
<-- ApplyTimestampTransform-S1_JOIN_T1
Processor: PrependAliasRight (stores: [])
--> none
<-- KTABLE-TRANSFORMVALUES-0000000002
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ Topologies:
--> KTABLE-TOSTREAM-0000000012
<-- KTABLE-MERGE-0000000008
Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
--> KSTREAM-SINK-0000000013
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Project
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000013
<-- KTABLE-TOSTREAM-0000000012
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ Topologies:
--> KTABLE-TOSTREAM-0000000012
<-- KTABLE-MERGE-0000000008
Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
--> KSTREAM-SINK-0000000013
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Project
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000013
<-- KTABLE-TOSTREAM-0000000012
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Loading

0 comments on commit ddddf92

Please sign in to comment.