Skip to content

Commit

Permalink
fix: support string/long/any timestamp, add new QTT tests
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Feb 24, 2020
1 parent f4b6466 commit d4989d2
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 455 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,36 +39,101 @@
]
},
{
"name": "KSQL override output timestamp for CSAS",
"name": "override output timestamp for CSAS",
"statements": [
"CREATE STREAM TEST (ID bigint, EVENT_TS bigint) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS');",
"CREATE STREAM TS WITH (timestamp='sink_ts') AS SELECT id as sink_ts, id, event_ts FROM test;"
"CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM TS WITH (timestamp='sink_ts') AS SELECT id as sink_ts, id FROM test;"
],
"inputs": [
{"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": 1526075913000}, "timestamp": 1526075913000},
{"topic": "test_topic", "value": {"ID": 2, "EVENT_TS": -1}, "timestamp": -1},
{"topic": "test_topic", "value": {"ID": 3, "EVENT_TS": 1589234313000}, "timestamp": 1589234313000}
{"topic": "test_topic", "value": {"ID": 1}, "timestamp": 1526075913000},
{"topic": "test_topic", "value": {"ID": 3}, "timestamp": 1589234313000}
],
"outputs": [
{"topic": "TS", "value": {"SINK_TS":1, "ID": 1, "EVENT_TS": 1526075913000}, "timestamp": 1},
{"topic": "TS", "value": {"SINK_TS":3, "ID": 3, "EVENT_TS": 1589234313000}, "timestamp": 3}
{"topic": "TS", "value": {"SINK_TS":1, "ID": 1}, "timestamp": 1},
{"topic": "TS", "value": {"SINK_TS":3, "ID": 3}, "timestamp": 3}
]
},
{
"name": "KSQL override output timestamp for CTAS",
"name": "override output timestamp for CSAS using a string TIMESTAMP_FORMAT",
"statements": [
"CREATE TABLE TEST (ID bigint, EVENT_TS bigint) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS');",
"CREATE TABLE TS WITH (timestamp='sink_ts') AS SELECT id as sink_ts, id, event_ts FROM test;"
"CREATE STREAM TEST (ID bigint, EVENT_TS varchar) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX');",
"CREATE STREAM TS WITH (timestamp='event_ts', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX') AS SELECT id, event_ts FROM test;"
],
"inputs": [
{"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": 1526075913000}, "timestamp": 1526075913000},
{"topic": "test_topic", "value": {"ID": 2, "EVENT_TS": -1}, "timestamp": -1},
{"topic": "test_topic", "value": {"ID": 3, "EVENT_TS": 1589234313000}, "timestamp": 1589234313000}
{"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": "2018-05-11T21:58:33Z"}, "timestamp": 10},
{"topic": "test_topic", "value": {"ID": 3, "EVENT_TS": "2019-05-11T21:58:33Z"}, "timestamp": 10}
],
"outputs": [
{"topic": "TS", "value": {"SINK_TS":1, "ID": 1, "EVENT_TS": 1526075913000}, "timestamp": 1},
{"topic": "TS", "value": {"SINK_TS":3, "ID": 3, "EVENT_TS": 1589234313000}, "timestamp": 3}
{"topic": "TS", "value": {"ID": 1, "EVENT_TS": "2018-05-11T21:58:33Z"}, "timestamp": 1526075913000},
{"topic": "TS", "value": {"ID": 3, "EVENT_TS": "2019-05-11T21:58:33Z"}, "timestamp": 1557611913000}
]
},
{
"name": "override output timestamp for CTAS",
"statements": [
"CREATE TABLE TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE TS WITH (timestamp='sink_ts') AS SELECT id as sink_ts, id FROM test;"
],
"inputs": [
{"topic": "test_topic", "value": {"ID": 1}, "timestamp": 1526075913000},
{"topic": "test_topic", "value": {"ID": 3}, "timestamp": 1589234313000}
],
"outputs": [
{"topic": "TS", "value": {"SINK_TS":1, "ID": 1}, "timestamp": 1},
{"topic": "TS", "value": {"SINK_TS":3, "ID": 3}, "timestamp": 3}
]
},
{
"name": "override output timestamp for CTAS using a string TIMESTAMP_FORMAT",
"statements": [
"CREATE STREAM TEST (ID bigint, EVENT_TS varchar) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX');",
"CREATE STREAM TS WITH (timestamp='event_ts', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX') AS SELECT id, event_ts FROM test;"
],
"inputs": [
{"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": "2018-05-11T21:58:33Z"}, "timestamp": 10},
{"topic": "test_topic", "value": {"ID": 3, "EVENT_TS": "2019-05-11T21:58:33Z"}, "timestamp": 10}
],
"outputs": [
{"topic": "TS", "value": {"ID": 1, "EVENT_TS": "2018-05-11T21:58:33Z"}, "timestamp": 1526075913000},
{"topic": "TS", "value": {"ID": 3, "EVENT_TS": "2019-05-11T21:58:33Z"}, "timestamp": 1557611913000}
]
},
{
"name": "timestamp column of source should not influence sink",
"statements": [
"CREATE STREAM INPUT (ID bigint, EVENT_TS bigint) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='EVENT_TS');",
"CREATE STREAM OUTPUT AS SELECT id as EVENT_TS FROM INPUT;"
],
"inputs": [
{"topic": "test_topic", "value": {"ID": 1, "EVENT_TS": 1526075913000}},
{"topic": "test_topic", "value": {"ID": 2, "EVENT_TS": 1589234313000}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"EVENT_TS":1}, "timestamp": 1526075913000},
{"topic": "OUTPUT", "value": {"EVENT_TS":2}, "timestamp": 1589234313000}
]
},
{
"name": "timestamp with column that does not exist should throw exception",
"statements": [
"CREATE STREAM TEST (ID bigint, EVENT_TS bigint) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM TS WITH (timestamp='invalid_ts') AS SELECT id, event_ts FROM test;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlException",
"message": "The TIMESTAMP column set in the WITH clause does not exist in the schema: 'INVALID_TS'"
}
},
{
"name": "timestamp column with invalid data type should throw exception",
"statements": [
"CREATE STREAM TEST (ID bigint, EVENT_TS int) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM TS WITH (timestamp='event_ts') AS SELECT id, event_ts FROM test;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlException",
"message": "Timestamp column, `EVENT_TS`, should be LONG(INT64) or a String with a timestamp_format specified"
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.KeySerdeFactory;
import io.confluent.ksql.execution.streams.timestamp.AbstractColumnTimestampExtractor;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicy;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;

import java.util.Objects;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -65,11 +67,11 @@ public static <K> void build(
queryContext
);

final Optional<TransformTimestamp<K>> tsTransformer = timestampColumn
.map(TimestampColumn::getColumn)
.map(c -> schema.findValueColumn(c).orElseThrow(IllegalStateException::new))
.map(Column::index)
.map(TransformTimestamp::new);
final Optional<TransformTimestamp<K>> tsTransformer = timestampTransformer(
queryBuilder.getKsqlConfig(),
schema,
timestampColumn
);

final KStream<K, GenericRow> transformed = tsTransformer
.map(t -> stream.transform(t))
Expand All @@ -78,27 +80,36 @@ public static <K> void build(
transformed.to(topicName, Produced.with(keySerde, valueSerde));
}

static class TransformTimestamp<K>
implements TransformerSupplier<K, GenericRow, KeyValue<K, GenericRow>> {
private final int timestampColumnIndex;

TransformTimestamp(final int timestampColumnIndex) {
this.timestampColumnIndex = requireNonNull(timestampColumnIndex, "timestampColumnIndex");
private static <K> Optional<TransformTimestamp<K>> timestampTransformer(
final KsqlConfig ksqlConfig,
final LogicalSchema sourceSchema,
final Optional<TimestampColumn> timestampColumn
) {
if (!timestampColumn.isPresent()) {
return Optional.empty();
}

@Override
public boolean equals(final Object o) {
if (o == null || !(o instanceof TransformTimestamp)) {
return false;
}
final TimestampExtractionPolicy timestampPolicy = TimestampExtractionPolicyFactory.create(
ksqlConfig,
sourceSchema,
timestampColumn
);

final TransformTimestamp that = (TransformTimestamp)o;
return timestampColumnIndex == that.timestampColumnIndex;
}
return timestampColumn
.map(TimestampColumn::getColumn)
.map(c -> sourceSchema.findValueColumn(c).orElseThrow(IllegalStateException::new))
.map(Column::index)
.map(timestampPolicy::create)
.filter(te -> te instanceof AbstractColumnTimestampExtractor)
.map(te -> new TransformTimestamp<>((AbstractColumnTimestampExtractor)te));
}

@Override
public int hashCode() {
return Objects.hashCode(timestampColumnIndex);
static class TransformTimestamp<K>
implements TransformerSupplier<K, GenericRow, KeyValue<K, GenericRow>> {
final AbstractColumnTimestampExtractor timestampExtractor;

TransformTimestamp(final AbstractColumnTimestampExtractor timestampExtractor) {
this.timestampExtractor = requireNonNull(timestampExtractor, "timestampExtractor");
}

@Override
Expand All @@ -113,17 +124,13 @@ public void init(final ProcessorContext processorContext) {

@Override
public KeyValue<K, GenericRow> transform(final K key, final GenericRow row) {
if (row.get(timestampColumnIndex) instanceof Long) {
processorContext.forward(
key,
row,
To.all().withTimestamp((long) row.get(timestampColumnIndex))
);

return null;
}
processorContext.forward(
key,
row,
To.all().withTimestamp(timestampExtractor.extract(row))
);

return KeyValue.pair(key, row);
return null;
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.streams.timestamp;

import com.google.common.base.Preconditions;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.util.KsqlException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

public abstract class AbstractColumnTimestampExtractor implements TimestampExtractor {
protected final int timetampColumnIndex;

AbstractColumnTimestampExtractor(final int timestampColumnindex) {
Preconditions.checkArgument(timestampColumnindex >= 0, "timestampColumnindex must be >= 0");
this.timetampColumnIndex = timestampColumnindex;
}

@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
if (record.value() instanceof GenericRow) {
try {
return extract((GenericRow) record.value());
} catch (final Exception e) {
throw new KsqlException("Unable to extract timestamp from record."
+ " record=" + record,
e);
}
}

return 0;
}

public abstract long extract(GenericRow row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,14 @@
package io.confluent.ksql.execution.streams.timestamp;

import io.confluent.ksql.GenericRow;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class LongTimestampExtractor implements TimestampExtractor {

private static final Logger log = LoggerFactory.getLogger(LongTimestampExtractor.class);
private final int timestampColumnindex;

public class LongTimestampExtractor extends AbstractColumnTimestampExtractor {
LongTimestampExtractor(final int timestampColumnindex) {
this.timestampColumnindex = timestampColumnindex;
super(timestampColumnindex);
}

@Override
public long extract(final ConsumerRecord<Object, Object> consumerRecord, final long l) {
if (timestampColumnindex < 0) {
return 0;
} else {
try {
if (consumerRecord.value() instanceof GenericRow) {
final GenericRow genericRow = (GenericRow) consumerRecord.value();
if (genericRow.get(timestampColumnindex) instanceof Long) {
return (long) genericRow.get(timestampColumnindex);
}
}
} catch (final Exception e) {
log.error("Exception in extracting timestamp for row: " + consumerRecord.value(), e);
}
}
return 0;
public long extract(final GenericRow row) {
return (long)row.get(timetampColumnIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,30 @@

package io.confluent.ksql.execution.streams.timestamp;

import com.google.common.base.Preconditions;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.timestamp.StringToTimestampParser;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

public class StringTimestampExtractor implements TimestampExtractor {
public class StringTimestampExtractor extends AbstractColumnTimestampExtractor {
private final StringToTimestampParser timestampParser;
private final int timestampColumn;
private final String format;

StringTimestampExtractor(final String format, final int timestampColumn) {
super(timestampColumn);

this.format = Objects.requireNonNull(format, "format can't be null");
Preconditions.checkArgument(timestampColumn >= 0, "timestampColumn must be >= 0");
this.timestampColumn = timestampColumn;
this.timestampParser = new StringToTimestampParser(format);
}

@Override
public long extract(final ConsumerRecord<Object, Object> consumerRecord,
final long previousTimestamp) {
final GenericRow row = (GenericRow) consumerRecord.value();
final String value = (String)row.get(timestampColumn);
public long extract(final GenericRow row) {
final String value = (String)row.get(timetampColumnIndex);

try {
return timestampParser.parse(value);
} catch (final KsqlException e) {
throw new KsqlException("Unable to parse string timestamp from record."
+ " record=" + consumerRecord
throw new KsqlException("Unable to parse string timestamp."
+ " timestamp=" + value
+ " timestamp_format=" + format,
e);
Expand Down
Loading

0 comments on commit d4989d2

Please sign in to comment.