diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java
index d966da17bdab..7b104a171cfb 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java
@@ -17,30 +17,40 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.errorprone.annotations.Immutable;
+import java.util.Objects;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
@Immutable
public class MetadataTimestampExtractionPolicy implements TimestampExtractionPolicy {
+ private final TimestampExtractor timestampExtractor;
@JsonCreator
- public MetadataTimestampExtractionPolicy(){}
+ public MetadataTimestampExtractionPolicy() {
+ this(new FailOnInvalidTimestamp());
+ }
+
+ public MetadataTimestampExtractionPolicy(final TimestampExtractor timestampExtractor) {
+ this.timestampExtractor = timestampExtractor;
+ }
@Override
public TimestampExtractor create(final int columnIndex) {
- return new FailOnInvalidTimestamp();
+ return timestampExtractor;
}
@Override
public int hashCode() {
- return this.getClass().hashCode();
+ return Objects.hash(this.getClass(), timestampExtractor.getClass());
}
@Override
public boolean equals(final Object other) {
- if (this == other) {
- return true;
+ if (!(other instanceof MetadataTimestampExtractionPolicy)) {
+ return false;
}
- return other instanceof MetadataTimestampExtractionPolicy;
+
+ final MetadataTimestampExtractionPolicy that = (MetadataTimestampExtractionPolicy)other;
+ return timestampExtractor.getClass() == that.timestampExtractor.getClass();
}
}
diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java
index 907f52077744..3b8443244fd9 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java
@@ -17,10 +17,15 @@
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.schema.ksql.KsqlSchema;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
+
import java.util.Optional;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.TimestampExtractor;
public final class TimestampExtractionPolicyFactory {
@@ -28,12 +33,13 @@ private TimestampExtractionPolicyFactory() {
}
public static TimestampExtractionPolicy create(
+ final KsqlConfig ksqlConfig,
final KsqlSchema schema,
final Optional timestampColumnName,
final Optional timestampFormat
) {
if (!timestampColumnName.isPresent()) {
- return new MetadataTimestampExtractionPolicy();
+ return new MetadataTimestampExtractionPolicy(getDefaultTimestampExtractor(ksqlConfig));
}
final String fieldName = timestampColumnName.get().toUpperCase();
@@ -71,4 +77,17 @@ public static TimestampExtractionPolicy create(
+ " specified");
}
+ private static TimestampExtractor getDefaultTimestampExtractor(final KsqlConfig ksqlConfig) {
+ try {
+ final Class> timestampExtractorClass = (Class>) ksqlConfig.getKsqlStreamConfigProps()
+ .getOrDefault(
+ StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+ FailOnInvalidTimestamp.class
+ );
+
+ return (TimestampExtractor) timestampExtractorClass.newInstance();
+ } catch (final Exception e) {
+ throw new KsqlException("Cannot override default timestamp extractor: " + e.getMessage(), e);
+ }
+ }
}
diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java
index 9befcf39b569..4f5e8252edd1 100644
--- a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java
+++ b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java
@@ -16,6 +16,7 @@
package io.confluent.ksql.util.timestamp;
import com.google.common.testing.EqualsTester;
+import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp;
import org.junit.Test;
public class MetadataTimestampExtractionPolicyTest {
@@ -25,6 +26,10 @@ public void shouldTestEqualityCorrectly() {
.addEqualityGroup(
new MetadataTimestampExtractionPolicy(),
new MetadataTimestampExtractionPolicy())
+ .addEqualityGroup(
+ new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp()),
+ new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp())
+ )
.testEquals();
}
}
diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java
index 294f4824f846..860eaf864707 100644
--- a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java
+++ b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java
@@ -19,12 +19,22 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.schema.ksql.KsqlSchema;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
+import java.util.Collections;
import java.util.Optional;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
public class TimestampExtractionPolicyFactoryTest {
@@ -32,14 +42,91 @@ public class TimestampExtractionPolicyFactoryTest {
private final SchemaBuilder schemaBuilder = SchemaBuilder.struct()
.field("id", Schema.OPTIONAL_INT64_SCHEMA);
+ private KsqlConfig ksqlConfig;
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void setup() {
+ ksqlConfig = new KsqlConfig(Collections.emptyMap());
+ }
+
@Test
public void shouldCreateMetadataPolicyWhenTimestampFieldNotProvided() {
// When:
final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
- .create(KsqlSchema.of(schemaBuilder.build()), Optional.empty(), Optional.empty());
+ .create(
+ ksqlConfig,
+ KsqlSchema.of(schemaBuilder.build()),
+ Optional.empty(),
+ Optional.empty()
+ );
+
+ // Then:
+ assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class));
+ assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class));
+ }
+
+ @Test
+ public void shouldThrowIfTimestampExtractorConfigIsInvalidClass() {
+ // Given:
+ final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of(
+ StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+ this.getClass()
+ ));
+
+ // Then:
+ expectedException.expect(KsqlException.class);
+ expectedException.expectMessage(
+ "cannot be cast to org.apache.kafka.streams.processor.TimestampExtractor");
+
+ // When:
+ TimestampExtractionPolicyFactory
+ .create(
+ ksqlConfig,
+ KsqlSchema.of(schemaBuilder.build()),
+ Optional.empty(),
+ Optional.empty()
+ );
+ }
+
+ @Test
+ public void shouldCreateMetadataPolicyWithDefaultFailedOnInvalidTimestamp() {
+ // When:
+ final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
+ .create(
+ ksqlConfig,
+ KsqlSchema.of(schemaBuilder.build()),
+ Optional.empty(),
+ Optional.empty()
+ );
+
+ // Then:
+ assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class));
+ assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class));
+ }
+
+ @Test
+ public void shouldCreateMetadataPolicyWithConfiguredUsePreviousTimeOnInvalidTimestamp() {
+ // Given:
+ final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of(
+ StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+ UsePreviousTimeOnInvalidTimestamp.class
+ ));
+
+ // When:
+ final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
+ .create(
+ ksqlConfig,
+ KsqlSchema.of(schemaBuilder.build()),
+ Optional.empty(),
+ Optional.empty()
+ );
// Then:
assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class));
+ assertThat(result.create(0), instanceOf(UsePreviousTimeOnInvalidTimestamp.class));
}
@Test
@@ -52,17 +139,26 @@ public void shouldCreateLongTimestampPolicyWhenTimestampFieldIsOfTypeLong() {
// When:
final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
- .create(KsqlSchema.of(schema), Optional.of(timestamp), Optional.empty());
+ .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(timestamp), Optional.empty());
// Then:
assertThat(result, instanceOf(LongColumnTimestampExtractionPolicy.class));
assertThat(result.timestampField(), equalTo(timestamp.toUpperCase()));
}
- @Test(expected = KsqlException.class)
+ @Test
public void shouldFailIfCantFindTimestampField() {
+ // Then:
+ expectedException.expect(KsqlException.class);
+
+ // When:
TimestampExtractionPolicyFactory
- .create(KsqlSchema.of(schemaBuilder.build()), Optional.of("whateva"), Optional.empty());
+ .create(
+ ksqlConfig,
+ KsqlSchema.of(schemaBuilder.build()),
+ Optional.of("whateva"),
+ Optional.empty()
+ );
}
@Test
@@ -75,14 +171,14 @@ public void shouldCreateStringTimestampPolicyWhenTimestampFieldIsStringTypeAndFo
// When:
final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
- .create(KsqlSchema.of(schema), Optional.of(field), Optional.of("yyyy-MM-DD"));
+ .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(field), Optional.of("yyyy-MM-DD"));
// Then:
assertThat(result, instanceOf(StringTimestampExtractionPolicy.class));
assertThat(result.timestampField(), equalTo(field.toUpperCase()));
}
- @Test(expected = KsqlException.class)
+ @Test
public void shouldFailIfStringTimestampTypeAndFormatNotSupplied() {
// Given:
final String field = "my_string_field";
@@ -90,12 +186,15 @@ public void shouldFailIfStringTimestampTypeAndFormatNotSupplied() {
.field(field.toUpperCase(), Schema.OPTIONAL_STRING_SCHEMA)
.build();
+ // Then:
+ expectedException.expect(KsqlException.class);
+
// When:
TimestampExtractionPolicyFactory
- .create(KsqlSchema.of(schema), Optional.of(field), Optional.empty());
+ .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(field), Optional.empty());
}
- @Test(expected = KsqlException.class)
+ @Test
public void shouldThorwIfLongTimestampTypeAndFormatIsSupplied() {
// Given:
final String timestamp = "timestamp";
@@ -103,12 +202,15 @@ public void shouldThorwIfLongTimestampTypeAndFormatIsSupplied() {
.field(timestamp.toUpperCase(), Schema.OPTIONAL_INT64_SCHEMA)
.build();
+ // Then:
+ expectedException.expect(KsqlException.class);
+
// When:
TimestampExtractionPolicyFactory
- .create(KsqlSchema.of(schema), Optional.of(timestamp), Optional.of("b"));
+ .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(timestamp), Optional.of("b"));
}
- @Test(expected = KsqlException.class)
+ @Test
public void shouldThrowIfTimestampFieldTypeIsNotLongOrString() {
// Given:
final String field = "blah";
@@ -116,8 +218,11 @@ public void shouldThrowIfTimestampFieldTypeIsNotLongOrString() {
.field(field.toUpperCase(), Schema.OPTIONAL_FLOAT64_SCHEMA)
.build();
+ // Then:
+ expectedException.expect(KsqlException.class);
+
// When:
TimestampExtractionPolicyFactory
- .create(KsqlSchema.of(schema), Optional.of(field), Optional.empty());
+ .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(field), Optional.empty());
}
}
\ No newline at end of file
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java
index c8672f752f79..0d8686d9bccb 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java
@@ -29,6 +29,7 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMapR2;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Map;
import java.util.Objects;
@@ -62,6 +63,7 @@ public CommandFactories(final ServiceContext serviceContext) {
public DdlCommand create(
final String sqlExpression,
final DdlStatement ddlStatement,
+ final KsqlConfig ksqlConfig,
final Map properties
) {
return FACTORIES
@@ -75,7 +77,7 @@ public DdlCommand create(
})
.handle(
this,
- new CallInfo(sqlExpression, properties),
+ new CallInfo(sqlExpression, ksqlConfig, properties),
ddlStatement);
}
@@ -90,6 +92,7 @@ private CreateStreamCommand handleCreateStream(
return new CreateStreamCommand(
callInfo.sqlExpression,
statement,
+ callInfo.ksqlConfig,
serviceContext.getTopicClient());
}
@@ -100,6 +103,7 @@ private CreateTableCommand handleCreateTable(
return new CreateTableCommand(
callInfo.sqlExpression,
statement,
+ callInfo.ksqlConfig,
serviceContext.getTopicClient());
}
@@ -140,14 +144,18 @@ private UnsetPropertyCommand handleUnsetProperty(
private static final class CallInfo {
final String sqlExpression;
+ final KsqlConfig ksqlConfig;
final Map properties;
private CallInfo(
final String sqlExpression,
+ final KsqlConfig ksqlConfig,
final Map properties
) {
- this.sqlExpression = sqlExpression;
- this.properties = properties;
+ this.sqlExpression = Objects.requireNonNull(sqlExpression, "sqlExpression");
+ this.properties = Objects.requireNonNull(properties, "properties");
+ this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig")
+ .cloneWithPropertyOverwrite(properties);
}
}
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java
index ccd28fd6bbc6..ea9e02fb92c3 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java
@@ -24,6 +24,7 @@
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.schema.ksql.LogicalSchemas;
import io.confluent.ksql.services.KafkaTopicClient;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.StringUtil;
@@ -55,6 +56,7 @@ abstract class CreateSourceCommand implements DdlCommand {
CreateSourceCommand(
final String sqlExpression,
final CreateSource statement,
+ final KsqlConfig ksqlConfig,
final KafkaTopicClient kafkaTopicClient
) {
this.sqlExpression = sqlExpression;
@@ -92,7 +94,7 @@ abstract class CreateSourceCommand implements DdlCommand {
final Optional timestampName = properties.getTimestampName();
final Optional timestampFormat = properties.getTimestampFormat();
this.timestampExtractionPolicy = TimestampExtractionPolicyFactory
- .create(schema, timestampName, timestampFormat);
+ .create(ksqlConfig, schema, timestampName, timestampFormat);
this.keySerdeFactory = extractKeySerde(properties);
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateStreamCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateStreamCommand.java
index 774c571ff03e..b7120c8005ea 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateStreamCommand.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateStreamCommand.java
@@ -19,6 +19,7 @@
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.services.KafkaTopicClient;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
public class CreateStreamCommand extends CreateSourceCommand {
@@ -26,9 +27,10 @@ public class CreateStreamCommand extends CreateSourceCommand {
public CreateStreamCommand(
final String sqlExpression,
final CreateStream createStream,
+ final KsqlConfig ksqlConfig,
final KafkaTopicClient kafkaTopicClient
) {
- super(sqlExpression, createStream, kafkaTopicClient);
+ super(sqlExpression, createStream, ksqlConfig, kafkaTopicClient);
}
@Override
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java
index fbd4968d4de7..32a1f7026130 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java
@@ -19,6 +19,7 @@
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.services.KafkaTopicClient;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
public class CreateTableCommand extends CreateSourceCommand {
@@ -26,9 +27,10 @@ public class CreateTableCommand extends CreateSourceCommand {
CreateTableCommand(
final String sqlExpression,
final CreateTable createTable,
+ final KsqlConfig ksqlConfig,
final KafkaTopicClient kafkaTopicClient
) {
- super(sqlExpression, createTable, kafkaTopicClient);
+ super(sqlExpression, createTable, ksqlConfig, kafkaTopicClient);
}
@Override
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandFactory.java
index 9f12bdc36580..1db22ccdcece 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandFactory.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandFactory.java
@@ -16,12 +16,15 @@
package io.confluent.ksql.ddl.commands;
import io.confluent.ksql.parser.tree.DdlStatement;
+import io.confluent.ksql.util.KsqlConfig;
+
import java.util.Map;
public interface DdlCommandFactory {
DdlCommand create(
String sqlExpression,
DdlStatement ddlStatement,
+ KsqlConfig ksqlConfig,
Map properties
);
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java
index cd0e596cbbdb..263fcf5afa2f 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java
@@ -29,6 +29,7 @@
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.SandboxedServiceContext;
import io.confluent.ksql.services.ServiceContext;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
@@ -154,6 +155,7 @@ QueryEngine createQueryEngine(final ServiceContext serviceContext) {
String executeDdlStatement(
final String sqlExpression,
final ExecutableDdlStatement statement,
+ final KsqlConfig ksqlConfig,
final Map overriddenProperties
) {
KsqlEngineProps.throwOnImmutableOverride(overriddenProperties);
@@ -161,6 +163,7 @@ String executeDdlStatement(
final DdlCommand command = ddlCommandFactory.create(
sqlExpression,
statement,
+ ksqlConfig,
overriddenProperties
);
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java
index 9d690a54beb0..55dbcf167411 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java
@@ -89,6 +89,7 @@ ExecuteResult execute(final ConfiguredStatement> statement) {
final String msg = engineContext.executeDdlStatement(
statement.getStatementText(),
(ExecutableDdlStatement) statement.getStatement(),
+ ksqlConfig,
overriddenProperties
);
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java
index 3b1421475ee3..6705aaacc7da 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java
@@ -150,6 +150,6 @@ private static OutputNode buildQueryLogicalPlan(
final Analysis analysis = queryAnalyzer.analyze(sqlExpression, query, sink);
final AggregateAnalysisResult aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
- return new LogicalPlanner(analysis, aggAnalysis, metaStore).buildPlan();
+ return new LogicalPlanner(config, analysis, aggAnalysis, metaStore).buildPlan();
}
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
index 94070ad74b1f..192f4de1df99 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
@@ -36,6 +36,7 @@
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.util.ExpressionTypeManager;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
@@ -49,15 +50,18 @@
public class LogicalPlanner {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling
+ private final KsqlConfig ksqlConfig;
private final Analysis analysis;
private final AggregateAnalysisResult aggregateAnalysis;
private final FunctionRegistry functionRegistry;
public LogicalPlanner(
+ final KsqlConfig ksqlConfig,
final Analysis analysis,
final AggregateAnalysisResult aggregateAnalysis,
final FunctionRegistry functionRegistry
) {
+ this.ksqlConfig = ksqlConfig;
this.analysis = analysis;
this.aggregateAnalysis = aggregateAnalysis;
this.functionRegistry = functionRegistry;
@@ -125,11 +129,12 @@ private OutputNode buildOutputNode(final PlanNode sourcePlanNode) {
);
}
- private static TimestampExtractionPolicy getTimestampExtractionPolicy(
+ private TimestampExtractionPolicy getTimestampExtractionPolicy(
final KsqlSchema inputSchema,
final Analysis analysis
) {
return TimestampExtractionPolicyFactory.create(
+ ksqlConfig,
inputSchema,
analysis.getTimestampColumnName(),
analysis.getTimestampFormat());
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java
index deb03a05dd9e..5c524425c020 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java
@@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.parser.tree.CreateStream;
@@ -38,6 +39,7 @@
import io.confluent.ksql.parser.tree.Type.SqlType;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.HashMap;
@@ -58,6 +60,8 @@ public class CommandFactoriesTest {
private final CommandFactories commandFactories = new CommandFactories(serviceContext);
private final HashMap properties = new HashMap<>();
+ private KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of());
+
@Before
public void before() {
@@ -79,7 +83,7 @@ public void before() {
public void shouldCreateDDLCommandForRegisterTopic() {
final DdlCommand result = commandFactories.create(
sqlExpression, new RegisterTopic(QualifiedName.of("blah"),
- true, properties), NO_PROPS);
+ true, properties), ksqlConfig, NO_PROPS);
assertThat(result, instanceOf(RegisterTopicCommand.class));
}
@@ -88,8 +92,7 @@ sqlExpression, new RegisterTopic(QualifiedName.of("blah"),
public void shouldCreateCommandForCreateStream() {
final DdlCommand result = commandFactories.create(
sqlExpression, new CreateStream(QualifiedName.of("foo"),
- SOME_ELEMENTS, true, properties),
- NO_PROPS);
+ SOME_ELEMENTS, true, properties), ksqlConfig, NO_PROPS);
assertThat(result, instanceOf(CreateStreamCommand.class));
}
@@ -99,8 +102,7 @@ public void shouldCreateCommandForCreateTable() {
final HashMap tableProperties = validTableProps();
final DdlCommand result = commandFactories
- .create(sqlExpression, createTable(tableProperties),
- NO_PROPS);
+ .create(sqlExpression, createTable(tableProperties), ksqlConfig, NO_PROPS);
assertThat(result, instanceOf(CreateTableCommand.class));
}
@@ -109,6 +111,7 @@ public void shouldCreateCommandForCreateTable() {
public void shouldCreateCommandForDropStream() {
final DdlCommand result = commandFactories.create(sqlExpression,
new DropStream(QualifiedName.of("foo"), true, true),
+ ksqlConfig,
NO_PROPS
);
assertThat(result, instanceOf(DropSourceCommand.class));
@@ -118,6 +121,7 @@ public void shouldCreateCommandForDropStream() {
public void shouldCreateCommandForDropTable() {
final DdlCommand result = commandFactories.create(sqlExpression,
new DropTable(QualifiedName.of("foo"), true, true),
+ ksqlConfig,
NO_PROPS
);
assertThat(result, instanceOf(DropSourceCommand.class));
@@ -127,6 +131,7 @@ public void shouldCreateCommandForDropTable() {
public void shouldCreateCommandForDropTopic() {
final DdlCommand result = commandFactories.create(sqlExpression,
new DropTopic(QualifiedName.of("foo"), true),
+ ksqlConfig,
NO_PROPS
);
assertThat(result, instanceOf(DropTopicCommand.class));
@@ -134,8 +139,7 @@ public void shouldCreateCommandForDropTopic() {
@Test(expected = KsqlException.class)
public void shouldThowKsqlExceptionIfCommandFactoryNotFound() {
- commandFactories.create(sqlExpression, new ExecutableDdlStatement() {},
- NO_PROPS);
+ commandFactories.create(sqlExpression, new ExecutableDdlStatement() {}, ksqlConfig, NO_PROPS);
}
private HashMap validTableProps() {
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java
index b673d554bc49..43f297c88410 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java
@@ -35,6 +35,7 @@
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.Type.SqlType;
import io.confluent.ksql.services.KafkaTopicClient;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.HashMap;
@@ -62,12 +63,16 @@ public class CreateSourceCommandTest {
@Mock
private KafkaTopicClient kafkaTopicClient;
+ private KsqlConfig ksqlConfig;
+
@Before
public void setUp() {
when(statement.getElements()).thenReturn(SOME_ELEMENTS);
when(statement.getName()).thenReturn(QualifiedName.of("bob"));
givenPropertiesWith(ImmutableMap.of());
when(kafkaTopicClient.isTopicExists(any())).thenReturn(true);
+
+ ksqlConfig = new KsqlConfig(ImmutableMap.of());
}
@Test
@@ -81,7 +86,7 @@ public void shouldThrowOnNoElements() {
"The statement does not define any columns.");
// When:
- new TestCmd("look mum, no columns", statement, kafkaTopicClient);
+ new TestCmd("look mum, no columns", statement, ksqlConfig, kafkaTopicClient);
}
@Test
@@ -90,7 +95,7 @@ public void shouldNotThrowWhenThereAreElements() {
when(statement.getElements()).thenReturn(SOME_ELEMENTS);
// When:
- new TestCmd("look mum, columns", statement, kafkaTopicClient);
+ new TestCmd("look mum, columns", statement, ksqlConfig, kafkaTopicClient);
// Then: not exception thrown
}
@@ -105,7 +110,7 @@ public void shouldThrowIfTopicDoesNotExist() {
expectedException.expectMessage("Kafka topic does not exist: " + TOPIC_NAME);
// When:
- new TestCmd("what, no value topic?", statement, kafkaTopicClient);
+ new TestCmd("what, no value topic?", statement, ksqlConfig, kafkaTopicClient);
}
@Test
@@ -114,7 +119,7 @@ public void shouldNotThrowIfTopicDoesExist() {
when(kafkaTopicClient.isTopicExists(TOPIC_NAME)).thenReturn(true);
// When:
- new TestCmd("what, no value topic?", statement, kafkaTopicClient);
+ new TestCmd("what, no value topic?", statement, ksqlConfig, kafkaTopicClient);
// Then:
verify(kafkaTopicClient).isTopicExists(TOPIC_NAME);
@@ -133,7 +138,7 @@ public void shouldThrowIfKeyFieldNotInSchema() {
+ "'WILL-NOT-FIND-ME'");
// When:
- new TestCmd("key not in schema!", statement, kafkaTopicClient);
+ new TestCmd("key not in schema!", statement, ksqlConfig, kafkaTopicClient);
}
@Test
@@ -150,7 +155,7 @@ public void shouldThrowIfTimestampColumnDoesNotExist() {
+ "'WILL-NOT-FIND-ME'");
// When:
- new TestCmd("key not in schema!", statement, kafkaTopicClient);
+ new TestCmd("key not in schema!", statement, ksqlConfig, kafkaTopicClient);
}
private static Map minValidProps() {
@@ -178,9 +183,10 @@ private static final class TestCmd extends CreateSourceCommand {
private TestCmd(
final String sqlExpression,
final CreateSource statement,
+ final KsqlConfig ksqlConfig,
final KafkaTopicClient kafkaTopicClient
) {
- super(sqlExpression, statement, kafkaTopicClient);
+ super(sqlExpression, statement, ksqlConfig, kafkaTopicClient);
}
@Override
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java
index 1a0d474ec527..d55850e306a7 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java
@@ -37,6 +37,7 @@
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.Type.SqlType;
import io.confluent.ksql.services.KafkaTopicClient;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
import java.util.Collections;
@@ -67,6 +68,8 @@ public class CreateStreamCommandTest {
private KafkaTopicClient topicClient;
@Mock
private CreateStream createStreamStatement;
+ @Mock
+ private KsqlConfig ksqlConfig;
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@@ -190,7 +193,11 @@ public void shouldAddSourceWithNoKeyField() {
}
private CreateStreamCommand createCmd() {
- return new CreateStreamCommand("some sql", createStreamStatement, topicClient);
+ return new CreateStreamCommand(
+ "some sql",
+ createStreamStatement,
+ ksqlConfig,
+ topicClient);
}
private void givenPropertiesWith(final Map props) {
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java
index 52a248f3f33c..2364563f32c4 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java
@@ -37,6 +37,7 @@
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.Type.SqlType;
import io.confluent.ksql.services.KafkaTopicClient;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
import java.util.Collections;
@@ -62,6 +63,8 @@ public class CreateTableCommandTest {
private KafkaTopicClient topicClient;
@Mock
private CreateTable createTableStatement;
+ @Mock
+ private KsqlConfig ksqlConfig;
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@@ -188,7 +191,11 @@ public void shouldAddSourceWithNoKeyField() {
private CreateTableCommand createCmd() {
- return new CreateTableCommand("some sql", createTableStatement, topicClient);
+ return new CreateTableCommand(
+ "some sql",
+ createTableStatement,
+ ksqlConfig,
+ topicClient);
}
private void givenPropertiesWith(final Map props) {
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java
index b99de58ccdc8..cd311910069f 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java
@@ -228,7 +228,7 @@ private PhysicalPlanBuilder buildPhysicalPlanBuilder(
}
private QueryMetadata buildPhysicalPlan(final String query) {
- final OutputNode logical = AnalysisTestUtil.buildLogicalPlan(query, metaStore);;
+ final OutputNode logical = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);;
return physicalPlanBuilder.buildPhysicalPlan(new LogicalPlanNode(query, Optional.of(logical)));
}
@@ -682,7 +682,7 @@ public void shouldConfigureProducerErrorHandlerLogger() {
final ProcessingLogger logger = mock(ProcessingLogger.class);
when(processingLogContext.getLoggerFactory()).thenReturn(loggerFactory);
final OutputNode spyNode = spy(
- AnalysisTestUtil.buildLogicalPlan(simpleSelectFilter, metaStore));
+ AnalysisTestUtil.buildLogicalPlan(ksqlConfig, simpleSelectFilter, metaStore));
doReturn(new QueryId("foo")).when(spyNode).getQueryId(any());
when(loggerFactory.getLogger("foo")).thenReturn(logger);
when(loggerFactory.getLogger(ArgumentMatchers.startsWith("foo.")))
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java
index c60b444fefb0..186a3c6f54b8 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java
@@ -34,7 +34,10 @@
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.testutils.AnalysisTestUtil;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.MetaStoreFixture;
+
+import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.junit.Assert;
@@ -44,10 +47,12 @@
public class LogicalPlannerTest {
private MetaStore metaStore;
+ private KsqlConfig ksqlConfig;
@Before
public void init() {
metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
+ ksqlConfig = new KsqlConfig(Collections.emptyMap());
}
@Test
@@ -249,6 +254,6 @@ public void shouldUpdateKeyToReflectProjectionAlias() {
}
private PlanNode buildLogicalPlan(final String query) {
- return AnalysisTestUtil.buildLogicalPlan(query, metaStore);
+ return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
}
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/PlanSourceExtractorVisitorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/PlanSourceExtractorVisitorTest.java
index bd2eac805de6..4f6a73fa75d0 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/planner/PlanSourceExtractorVisitorTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/PlanSourceExtractorVisitorTest.java
@@ -22,7 +22,10 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.testutils.AnalysisTestUtil;
+import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.MetaStoreFixture;
+
+import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.utils.Utils;
import org.junit.Before;
@@ -31,10 +34,12 @@
public class PlanSourceExtractorVisitorTest {
private MetaStore metaStore;
+ private KsqlConfig ksqlConfig;
@Before
public void init() {
metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
+ ksqlConfig = new KsqlConfig(Collections.emptyMap());
}
@Test
@@ -61,6 +66,6 @@ public void shouldExtractCorrectSourceForJoinQuery() {
}
private PlanNode buildLogicalPlan(final String query) {
- return AnalysisTestUtil.buildLogicalPlan(query, metaStore);
+ return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
}
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java
index 3acb2e55e88a..5abc99d366b6 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java
@@ -395,7 +395,7 @@ private SchemaKStream buildQuery(final AggregateNode aggregateNode, final KsqlCo
private static AggregateNode buildAggregateNode(final String queryString) {
final MetaStore newMetaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
final KsqlBareOutputNode planNode = (KsqlBareOutputNode) AnalysisTestUtil
- .buildLogicalPlan(queryString, newMetaStore);
+ .buildLogicalPlan(KSQL_CONFIG, queryString, newMetaStore);
return (AggregateNode) planNode.getSource();
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java
index 718cb39ef090..f94be8207b43 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java
@@ -1069,7 +1069,7 @@ private void buildJoinNode(final String queryString) {
final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
final KsqlBareOutputNode planNode =
- (KsqlBareOutputNode) AnalysisTestUtil.buildLogicalPlan(queryString, metaStore);
+ (KsqlBareOutputNode) AnalysisTestUtil.buildLogicalPlan(ksqlConfig, queryString, metaStore);
joinNode = (JoinNode) ((ProjectNode) planNode.getSource()).getSource();
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java
index b236815ea4ef..e90bd902a78e 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java
@@ -68,6 +68,7 @@ public class KsqlBareOutputNodeTest {
private StreamsBuilder builder;
private final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
private final QueryId queryId = new QueryId("output-test");
+ private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap());
@Mock
private KsqlQueryBuilder ksqlStreamBuilder;
@@ -87,7 +88,7 @@ public void before() {
.push(inv.getArgument(0).toString()));
final KsqlBareOutputNode planNode = (KsqlBareOutputNode) AnalysisTestUtil
- .buildLogicalPlan(SIMPLE_SELECT_WITH_FILTER, metaStore);
+ .buildLogicalPlan(ksqlConfig, SIMPLE_SELECT_WITH_FILTER, metaStore);
stream = planNode.buildStream(ksqlStreamBuilder);
}
@@ -134,7 +135,7 @@ public void shouldComputeQueryIdCorrectly() {
// Given:
final KsqlBareOutputNode node
= (KsqlBareOutputNode) AnalysisTestUtil
- .buildLogicalPlan("select col0 from test1;", metaStore);
+ .buildLogicalPlan(ksqlConfig, "select col0 from test1;", metaStore);
final QueryIdGenerator queryIdGenerator = mock(QueryIdGenerator.class);
// When:
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java
index 03cfc6ffc509..1dd26bd9e5c3 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java
@@ -111,7 +111,7 @@ private SchemaKGroupedTable buildSchemaKGroupedTableFromQuery(
final String query,
final String...groupByColumns
) {
- final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(query, metaStore);
+ final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
final SchemaKTable> initialSchemaKTable = new SchemaKTable<>(
logicalPlan.getTheSourceNode().getSchema(),
kTable,
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
index 32dadd2cb0e1..c9669d2e4991 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
@@ -922,7 +922,11 @@ private static KsqlSchema getJoinSchema(
}
private PlanNode givenInitialKStreamOf(final String selectQuery) {
- final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(selectQuery, metaStore);
+ final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(
+ ksqlConfig,
+ selectQuery,
+ metaStore
+ );
initialSchemaKStream = new SchemaKStream(
logicalPlan.getTheSourceNode().getSchema(),
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java
index e6979d603ecf..e6d6780f8b1b 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java
@@ -633,7 +633,11 @@ private static KsqlSchema getJoinSchema(final KsqlSchema leftSchema,
}
private List givenInitialKTableOf(final String selectQuery) {
- final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(selectQuery, metaStore);
+ final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(
+ ksqlConfig,
+ selectQuery,
+ metaStore
+ );
initialSchemaKTable = new SchemaKTable<>(
logicalPlan.getTheSourceNode().getSchema(),
@@ -659,6 +663,6 @@ private List givenInitialKTableOf(final String selectQuery) {
}
private PlanNode buildLogicalPlan(final String query) {
- return AnalysisTestUtil.buildLogicalPlan(query, metaStore);
+ return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
}
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java
index f68cd09e9007..7fa9ffeb2855 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java
@@ -137,7 +137,7 @@ public void shouldWriteProcessingLogOnError() {
}
private SelectValueMapper givenSelectMapperFor(final String query) {
- final PlanNode planNode = AnalysisTestUtil.buildLogicalPlan(query, metaStore);
+ final PlanNode planNode = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
final ProjectNode projectNode = (ProjectNode) planNode.getSources().get(0);
final KsqlSchema schema = planNode.getTheSourceNode().getSchema();
final List selectExpressions = projectNode.getProjectSelectExpressions();
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java
index ec1352fe060e..9f6c4776697b 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java
@@ -136,7 +136,11 @@ public void shouldWriteProcessingLogOnError() {
}
private SqlPredicate givenSqlPredicateFor(final String statement) {
- final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(statement, metaStore);
+ final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(
+ ksqlConfig,
+ statement,
+ metaStore
+ );
final FilterNode filterNode = (FilterNode) logicalPlan.getSources().get(0).getSources().get(0);
return new SqlPredicate(
filterNode.getPredicate(),
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java
index 07c839b9ab57..2e4eff01b717 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java
@@ -30,6 +30,8 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.planner.LogicalPlanner;
import io.confluent.ksql.planner.plan.OutputNode;
+import io.confluent.ksql.util.KsqlConfig;
+
import java.util.List;
import java.util.Optional;
@@ -42,10 +44,15 @@ public static Analysis analyzeQuery(final String queryStr, final MetaStore metaS
return new Analyzer(queryStr, metaStore).analysis;
}
- public static OutputNode buildLogicalPlan(final String queryStr, final MetaStore metaStore) {
+ public static OutputNode buildLogicalPlan(
+ final KsqlConfig ksqlConfig,
+ final String queryStr,
+ final MetaStore metaStore
+ ) {
final Analyzer analyzer = new Analyzer(queryStr, metaStore);
final LogicalPlanner logicalPlanner = new LogicalPlanner(
+ ksqlConfig,
analyzer.analysis,
analyzer.aggregateAnalys(),
metaStore);
diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json b/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json
new file mode 100644
index 000000000000..020d4a33d748
--- /dev/null
+++ b/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json
@@ -0,0 +1,44 @@
+{
+ "comments": [
+ "Tests to verify override of default.timestamp.extractor on streams"
+ ],
+ "tests": [
+ {
+ "name": "KSQL default timestamp extractor",
+ "statements": [
+ "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='JSON');",
+ "CREATE STREAM TS AS SELECT id FROM test;"
+ ],
+ "inputs": [
+ {"topic": "test_topic", "value": {"ID": 1}, "timestamp": 1526075913000},
+ {"topic": "test_topic", "value": {"ID": 2}, "timestamp": 1557611913000},
+ {"topic": "test_topic", "value": {"ID": 3}, "timestamp": 1589234313000}
+ ],
+ "outputs": [
+ {"topic": "TS", "value": {"ID": 1}, "timestamp": 1526075913000},
+ {"topic": "TS", "value": {"ID": 2}, "timestamp": 1557611913000},
+ {"topic": "TS", "value": {"ID": 3}, "timestamp": 1589234313000}
+ ]
+ },
+ {
+ "name": "KSQL override timestamp extractor",
+ "statements": [
+ "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='JSON');",
+ "CREATE STREAM TS AS SELECT id FROM test;"
+ ],
+ "properties": {
+ "ksql.streams.default.timestamp.extractor": "org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp"
+ },
+ "inputs": [
+ {"topic": "test_topic", "value": {"ID": 1}, "timestamp": 1526075913000},
+ {"topic": "test_topic", "value": {"ID": 2}, "timestamp": -1},
+ {"topic": "test_topic", "value": {"ID": 3}, "timestamp": 1589234313000}
+ ],
+ "outputs": [
+ {"topic": "TS", "value": {"ID": 1}, "timestamp": 1526075913000},
+ {"topic": "TS", "value": {"ID": 2}, "timestamp": 1526075913000},
+ {"topic": "TS", "value": {"ID": 3}, "timestamp": 1589234313000}
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java
index dbf1a797c3d5..606f8ec2ec4c 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.stream.Collectors;
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonSubTypes({})
@@ -46,7 +47,7 @@ public KsqlRequest(
this.ksql = ksql == null ? "" : ksql;
this.streamsProperties = streamsProperties == null
? Collections.emptyMap()
- : Collections.unmodifiableMap(new HashMap<>(streamsProperties));
+ : Collections.unmodifiableMap(new HashMap<>(serializeClassValues(streamsProperties)));
this.commandSequenceNumber = Optional.ofNullable(commandSequenceNumber);
}
@@ -83,6 +84,24 @@ public int hashCode() {
return Objects.hash(ksql, streamsProperties, commandSequenceNumber);
}
+ /**
+ * Converts all Class references values to their canonical String value.
+ *
+ * This conversion avoids the JsonMappingException error thrown by Jackson when attempting
+ * to serialize the class properties prior to send this KsqlRequest object as part of the HTTP
+ * request. The error thrown by Jackson is "Class ... not be found".
+ */
+ private Map serializeClassValues(final Map properties) {
+ return properties.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, kv -> {
+ if (kv.getValue() instanceof Class) {
+ return ((Class)kv.getValue()).getCanonicalName();
+ }
+
+ return kv.getValue();
+ }));
+ }
+
private static Map coerceTypes(final Map streamsProperties) {
if (streamsProperties == null) {
return Collections.emptyMap();
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
index 8105f08a1d31..6c80edd09134 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
@@ -286,6 +286,7 @@ private void initialize() {
.put(DdlConfig.TOPIC_NAME_PROPERTY, new StringLiteral(COMMANDS_KSQL_TOPIC_NAME))
.build()
),
+ ksqlConfig,
serviceContext.getTopicClient()
));
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java
index 784bbcbb374b..d5a8a16d97d5 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java
@@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TimestampExtractor;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -42,23 +44,30 @@ public class KsqlRequestTest {
private static final String A_JSON_REQUEST = "{"
+ "\"ksql\":\"sql\","
+ "\"streamsProperties\":{"
- + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\""
+ + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\","
+ + "\"" + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "\":\""
+ + TimestampExtractor.class.getCanonicalName() + "\""
+ "}}";
private static final String A_JSON_REQUEST_WITH_COMMAND_NUMBER = "{"
+ "\"ksql\":\"sql\","
+ "\"streamsProperties\":{"
- + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\""
+ + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\","
+ + "\"" + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "\":\""
+ + TimestampExtractor.class.getCanonicalName() + "\""
+ "},"
+ "\"commandSequenceNumber\":2}";
private static final String A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER = "{"
+ "\"ksql\":\"sql\","
+ "\"streamsProperties\":{"
- + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\""
+ + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\","
+ + "\"" + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "\":\""
+ + TimestampExtractor.class.getCanonicalName() + "\""
+ "},"
+ "\"commandSequenceNumber\":null}";
private static final ImmutableMap SOME_PROPS = ImmutableMap.of(
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
+ StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class
);
private static final long SOME_COMMAND_NUMBER = 2L;