From c8607930697bcb2b98f4ac4151360536240c6b85 Mon Sep 17 00:00:00 2001 From: Rohan Date: Thu, 15 Aug 2019 12:29:19 -0700 Subject: [PATCH] feat: add an initial set of execution steps (#3214) This patch adds an initial set of execution step nodes. Subsequent patches will have schemakstream/table build the execution tree as it goes along, and then we'll move calls to streams into the implementations of ExecutionStep.build. All execution steps implement ExecutionStep, which along with supporting a few common properties (step id and schema), includes a method called build(), which will eventually get called to build the streams app (as described above). --- .../ksql/planner/plan/AggregateNode.java | 2 +- .../ksql/planner/plan/ProjectNode.java | 2 +- .../ksql/structured/QueuedSchemaKStream.java | 2 +- .../ksql/structured/SchemaKStream.java | 2 +- .../ksql/structured/SchemaKTable.java | 2 +- .../ksql/planner/plan/ProjectNodeTest.java | 2 +- .../ksql/structured/SchemaKStreamTest.java | 2 +- .../ksql/structured/SchemaKTableTest.java | 2 +- .../structured/SelectValueMapperTest.java | 2 +- .../plan/DefaultExecutionStepProperties.java | 64 ++++++++++++++ .../ksql/execution/plan/ExecutionStep.java | 26 ++++++ .../plan/ExecutionStepProperties.java | 23 +++++ .../ksql/execution/plan/Formats.java | 69 +++++++++++++++ .../ksql/execution/plan/JoinType.java | 21 +++++ .../execution/plan}/SelectExpression.java | 2 +- .../ksql/execution/plan/StreamAggregate.java | 85 +++++++++++++++++++ .../ksql/execution/plan/StreamFilter.java | 76 +++++++++++++++++ .../ksql/execution/plan/StreamGroupBy.java | 84 ++++++++++++++++++ .../ksql/execution/plan/StreamMapValues.java | 75 ++++++++++++++++ .../ksql/execution/plan/StreamSelectKey.java | 74 ++++++++++++++++ .../ksql/execution/plan/StreamSink.java | 82 ++++++++++++++++++ .../ksql/execution/plan/StreamSource.java | 84 ++++++++++++++++++ .../ksql/execution/plan/StreamStreamJoin.java | 83 ++++++++++++++++++ .../ksql/execution/plan/StreamTableJoin.java | 84 ++++++++++++++++++ .../ksql/execution/plan/StreamToTable.java | 76 +++++++++++++++++ .../ksql/execution/plan/TableAggregate.java | 85 +++++++++++++++++++ .../ksql/execution/plan/TableFilter.java | 76 +++++++++++++++++ .../ksql/execution/plan/TableGroupBy.java | 81 ++++++++++++++++++ .../ksql/execution/plan/TableMapValues.java | 75 ++++++++++++++++ .../ksql/execution/plan/TableSink.java | 83 ++++++++++++++++++ .../ksql/execution/plan/TableTableJoin.java | 82 ++++++++++++++++++ .../query-validation-tests/key-field.json | 2 +- 32 files changed, 1499 insertions(+), 11 deletions(-) create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/DefaultExecutionStepProperties.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStepProperties.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/Formats.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/JoinType.java rename {ksql-engine/src/main/java/io/confluent/ksql/util => ksql-execution/src/main/java/io/confluent/ksql/execution/plan}/SelectExpression.java (97%) create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamAggregate.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFilter.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupBy.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamMapValues.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKey.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSink.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSource.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamToTable.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableAggregate.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableFilter.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableMapValues.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSink.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableTableJoin.java diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java index d09adde9ca9c..f98c28c363d0 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java @@ -27,6 +27,7 @@ import io.confluent.ksql.execution.expression.tree.QualifiedName; import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.function.AggregateFunctionArguments; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; @@ -52,7 +53,6 @@ import io.confluent.ksql.util.AggregateExpressionRewriter; import io.confluent.ksql.util.ExpressionTypeManager; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.SelectExpression; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java index 185cf8ea684f..ac5e6ed93b1f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java @@ -19,13 +19,13 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.physical.KsqlQueryBuilder; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.SelectExpression; import java.util.ArrayList; import java.util.List; import java.util.Optional; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java index f2388281f3dc..90c08f56a7a8 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java @@ -17,10 +17,10 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.util.SelectExpression; import java.util.List; import java.util.Set; import org.apache.kafka.common.serialization.Serde; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 22722ed3e3aa..4130a8d1f6cf 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -23,6 +23,7 @@ import io.confluent.ksql.execution.expression.tree.DereferenceExpression; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogger; @@ -40,7 +41,6 @@ import io.confluent.ksql.util.ParserUtil; import io.confluent.ksql.util.QueryLoggerUtil; import io.confluent.ksql.util.SchemaUtil; -import io.confluent.ksql.util.SelectExpression; import java.util.ArrayList; import java.util.Collections; import java.util.List; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index c64ffd4af190..9c7e26a04cab 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.model.KeyField; @@ -30,7 +31,6 @@ import io.confluent.ksql.streams.StreamsUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.QueryLoggerUtil; -import io.confluent.ksql.util.SelectExpression; import java.util.ArrayList; import java.util.Collections; import java.util.List; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java index 5a5b072e64a2..567963abb3b4 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java @@ -37,7 +37,7 @@ import io.confluent.ksql.structured.QueryContext.Stacker; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.SelectExpression; +import io.confluent.ksql.execution.plan.SelectExpression; import java.util.Arrays; import java.util.Optional; import org.apache.kafka.connect.data.Schema; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index ce4c5ee7b7ce..5ffe7ea2ab5c 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -73,7 +73,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; import io.confluent.ksql.util.SchemaUtil; -import io.confluent.ksql.util.SelectExpression; +import io.confluent.ksql.execution.plan.SelectExpression; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index 0c4062b22142..2424a39fd0a6 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -71,7 +71,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; import io.confluent.ksql.util.SchemaUtil; -import io.confluent.ksql.util.SelectExpression; +import io.confluent.ksql.execution.plan.SelectExpression; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java index 07246d81c183..4d8d5f908f25 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java @@ -36,7 +36,7 @@ import io.confluent.ksql.util.ExpressionMetadata; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; -import io.confluent.ksql.util.SelectExpression; +import io.confluent.ksql.execution.plan.SelectExpression; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/DefaultExecutionStepProperties.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/DefaultExecutionStepProperties.java new file mode 100644 index 000000000000..dd0ec7e4349f --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/DefaultExecutionStepProperties.java @@ -0,0 +1,64 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.Objects; +import jdk.nashorn.internal.ir.annotations.Immutable; + +@Immutable +public class DefaultExecutionStepProperties implements ExecutionStepProperties { + private final String id; + private final LogicalSchema schema; + + public DefaultExecutionStepProperties(final String id, final LogicalSchema schema) { + this.id = Objects.requireNonNull(id, "id"); + this.schema = Objects.requireNonNull(schema, "schema"); + } + + public LogicalSchema getSchema() { + return schema; + } + + public String getId() { + return id; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DefaultExecutionStepProperties that = (DefaultExecutionStepProperties) o; + return Objects.equals(id, that.id) + && Objects.equals(schema, that.schema); + } + + @Override + public int hashCode() { + return Objects.hash(id, schema); + } + + @Override + public String toString() { + return "ExecutionStepProperties{" + + "id='" + id + '\'' + + ", schema=" + schema + + '}'; + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java new file mode 100644 index 000000000000..fccf59b9b344 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import java.util.List; +import org.apache.kafka.streams.StreamsBuilder; + +public interface ExecutionStep { + ExecutionStepProperties getProperties(); + + List> getSources(); + + T build(StreamsBuilder builder); +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStepProperties.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStepProperties.java new file mode 100644 index 000000000000..d474fda64536 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStepProperties.java @@ -0,0 +1,23 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import io.confluent.ksql.schema.ksql.LogicalSchema; + +public interface ExecutionStepProperties { + LogicalSchema getSchema(); + + String getId(); +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/Formats.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/Formats.java new file mode 100644 index 000000000000..ce3cd0888ac5 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/Formats.java @@ -0,0 +1,69 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.ValueFormat; +import java.util.Objects; +import java.util.Set; + +@Immutable +public final class Formats { + private final KeyFormat keyFormat; + private final ValueFormat valueFormat; + private final Set options; + + public Formats( + final KeyFormat keyFormat, + final ValueFormat valueFormat, + final Set options) { + this.keyFormat = Objects.requireNonNull(keyFormat, "keyFormat"); + this.valueFormat = Objects.requireNonNull(valueFormat, "valueFormat"); + this.options = Objects.requireNonNull(options, "options"); + } + + public KeyFormat getKeyFormat() { + return keyFormat; + } + + public ValueFormat getValueFormat() { + return valueFormat; + } + + public Set getOptions() { + return options; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Formats serdeInfo = (Formats) o; + return Objects.equals(keyFormat, serdeInfo.keyFormat) + && Objects.equals(valueFormat, serdeInfo.valueFormat) + && Objects.equals(options, serdeInfo.options); + } + + @Override + public int hashCode() { + return Objects.hash(keyFormat, valueFormat, options); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/JoinType.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/JoinType.java new file mode 100644 index 000000000000..2f0a89243330 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/JoinType.java @@ -0,0 +1,21 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +public enum JoinType { + INNER, + LEFT, + OUTER +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/SelectExpression.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/SelectExpression.java similarity index 97% rename from ksql-engine/src/main/java/io/confluent/ksql/util/SelectExpression.java rename to ksql-execution/src/main/java/io/confluent/ksql/execution/plan/SelectExpression.java index 2f8ec27cc222..59aa068619ea 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/SelectExpression.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/SelectExpression.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.util; +package io.confluent.ksql.execution.plan; import io.confluent.ksql.execution.expression.tree.Expression; import java.util.Objects; diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamAggregate.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamAggregate.java new file mode 100644 index 000000000000..63720c18932c --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamAggregate.java @@ -0,0 +1,85 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.function.KsqlAggregateFunction; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KTable; + +@Immutable +public class StreamAggregate implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final Formats formats; + private final Map aggValToFunctionMap; + private final Map aggValToValColumnMap; + + public StreamAggregate( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final Formats formats, + final Map aggValToFunctionMap, + final Map aggValToValColumnMap) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.aggValToFunctionMap = Objects.requireNonNull(aggValToFunctionMap); + this.aggValToValColumnMap = Objects.requireNonNull(aggValToValColumnMap); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KTable build(final StreamsBuilder streamsBuilder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamAggregate that = (StreamAggregate) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source) + && Objects.equals(formats, that.formats) + && Objects.equals(aggValToFunctionMap, that.aggValToFunctionMap) + && Objects.equals(aggValToValColumnMap, that.aggValToValColumnMap); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, formats, aggValToFunctionMap, aggValToValColumnMap); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFilter.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFilter.java new file mode 100644 index 000000000000..740dc76091e2 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFilter.java @@ -0,0 +1,76 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.expression.tree.Expression; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; + +@Immutable +public class StreamFilter implements ExecutionStep> { + + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final Expression filterExpression; + + public StreamFilter( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final Expression filterExpression) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.filterExpression = Objects.requireNonNull(filterExpression, "filterExpression"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KStream build(final StreamsBuilder streamsBuilder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamFilter that = (StreamFilter) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source) + && Objects.equals(filterExpression, that.filterExpression); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, filterExpression); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupBy.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupBy.java new file mode 100644 index 000000000000..b65f7a62206d --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupBy.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.expression.tree.Expression; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; + +@Immutable +public class StreamGroupBy implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final Formats formats; + private final List groupByExpressions; + + public StreamGroupBy( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final Formats formats, + final List groupByExpressions) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.source = Objects.requireNonNull(source, "source"); + this.groupByExpressions = Objects.requireNonNull(groupByExpressions, "groupByExpressions"); + } + + public List getGroupByExpressions() { + return groupByExpressions; + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KGroupedStream build(final StreamsBuilder streamsBuilder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamGroupBy that = (StreamGroupBy) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source) + && Objects.equals(formats, that.formats) + && Objects.equals(groupByExpressions, that.groupByExpressions); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, formats, groupByExpressions); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamMapValues.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamMapValues.java new file mode 100644 index 000000000000..f8d3bfb84e53 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamMapValues.java @@ -0,0 +1,75 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.GenericRow; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; + +@Immutable +public class StreamMapValues implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final List selectExpressions; + + public StreamMapValues( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final List selectExpressions) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.selectExpressions = ImmutableList.copyOf(selectExpressions); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KStream build(final StreamsBuilder streamsBuilder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamMapValues that = (StreamMapValues) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source) + && Objects.equals(selectExpressions, that.selectExpressions); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, selectExpressions); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKey.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKey.java new file mode 100644 index 000000000000..1fd737f3af48 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKey.java @@ -0,0 +1,74 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; + +@Immutable +public class StreamSelectKey implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final boolean updateRowKey; + + public StreamSelectKey( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final boolean updateRowKey) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.updateRowKey = updateRowKey; + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KStream build(final StreamsBuilder streamsBuilder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamSelectKey that = (StreamSelectKey) o; + return updateRowKey == that.updateRowKey + && Objects.equals(properties, that.properties) + && Objects.equals(source, that.source); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, updateRowKey); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSink.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSink.java new file mode 100644 index 000000000000..317488d07796 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSink.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import io.confluent.ksql.GenericRow; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; + +@Immutable +public class StreamSink implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final Formats formats; + private final String topicName; + + public StreamSink( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final Formats formats, + final String topicName) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.source = Objects.requireNonNull(source, "source"); + this.topicName = Objects.requireNonNull(topicName, "topicName"); + } + + public String getTopicName() { + return topicName; + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KStream build(final StreamsBuilder streamsBuilder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamSink that = (StreamSink) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source) + && Objects.equals(formats, that.formats) + && Objects.equals(topicName, that.topicName); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, formats, topicName); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSource.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSource.java new file mode 100644 index 000000000000..7860d75d4583 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSource.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.KStream; + +@Immutable +public class StreamSource implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final String topicName; + private final Formats formats; + private final TimestampExtractionPolicy timestampPolicy; + private final Topology.AutoOffsetReset offsetReset; + + public StreamSource( + final ExecutionStepProperties properties, + final String topicName, + final Formats formats, + final TimestampExtractionPolicy timestampPolicy, + final Topology.AutoOffsetReset offsetReset) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.topicName = Objects.requireNonNull(topicName, "topicName"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.timestampPolicy = Objects.requireNonNull(timestampPolicy, "timestampPolicy"); + this.offsetReset = Objects.requireNonNull(offsetReset, "offsetReset"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.emptyList(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamSource that = (StreamSource) o; + return Objects.equals(properties, that.properties) + && Objects.equals(topicName, that.topicName) + && Objects.equals(formats, that.formats) + && Objects.equals(timestampPolicy, that.timestampPolicy) + && offsetReset == that.offsetReset; + } + + @Override + public int hashCode() { + + return Objects.hash(properties, topicName, formats, timestampPolicy, offsetReset); + } + + @Override + public KStream build(final StreamsBuilder builder) { + throw new UnsupportedOperationException(); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java new file mode 100644 index 000000000000..3c602b9eb4fe --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java @@ -0,0 +1,83 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; + +@Immutable +public class StreamStreamJoin implements ExecutionStep> { + + private final ExecutionStepProperties properties; + private final JoinType joinType; + private final Formats formats; + private final ExecutionStep> left; + private final ExecutionStep> right; + + public StreamStreamJoin( + final ExecutionStepProperties properties, + final JoinType joinType, + final Formats formats, + final ExecutionStep> left, + final ExecutionStep> right) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.joinType = Objects.requireNonNull(joinType, "joinType"); + this.left = Objects.requireNonNull(left, "left"); + this.right = Objects.requireNonNull(right, "right"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return ImmutableList.of(left, right); + } + + @Override + public KStream build(final StreamsBuilder streamsBuilder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamStreamJoin that = (StreamStreamJoin) o; + return Objects.equals(properties, that.properties) + && joinType == that.joinType + && Objects.equals(formats, that.formats) + && Objects.equals(left, that.left) + && Objects.equals(right, that.right); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, joinType, formats, left, right); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java new file mode 100644 index 000000000000..94207fc8079f --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; + +@Immutable +public class StreamTableJoin implements ExecutionStep> { + + private final ExecutionStepProperties properties; + private final JoinType joinType; + private final Formats formats; + private final ExecutionStep> left; + private final ExecutionStep> right; + + public StreamTableJoin( + final ExecutionStepProperties properties, + final JoinType joinType, + final Formats formats, + final ExecutionStep> left, + final ExecutionStep> right) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.joinType = Objects.requireNonNull(joinType, "joinType"); + this.left = Objects.requireNonNull(left, "left"); + this.right = Objects.requireNonNull(right, "right"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return ImmutableList.of(left, right); + } + + @Override + public KStream build(final StreamsBuilder streamsBuilder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamTableJoin that = (StreamTableJoin) o; + return Objects.equals(properties, that.properties) + && joinType == that.joinType + && Objects.equals(formats, that.formats) + && Objects.equals(left, that.left) + && Objects.equals(right, that.right); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, joinType, formats, left, right); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamToTable.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamToTable.java new file mode 100644 index 000000000000..5f28098bbfbc --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamToTable.java @@ -0,0 +1,76 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; + +@Immutable +public class StreamToTable implements ExecutionStep> { + private final ExecutionStep> source; + private final Formats formats; + private final ExecutionStepProperties properties; + + public StreamToTable( + final ExecutionStep> source, + final Formats formats, + final ExecutionStepProperties properties) { + this.source = Objects.requireNonNull(source, "source"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.properties = Objects.requireNonNull(properties, "properties"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + + @Override + public List> getSources() { + return ImmutableList.of(source); + } + + @Override + public KTable build(final StreamsBuilder builder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamToTable that = (StreamToTable) o; + return Objects.equals(source, that.source) + && Objects.equals(formats, that.formats) + && Objects.equals(properties, that.properties); + } + + @Override + public int hashCode() { + + return Objects.hash(source, formats, properties); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableAggregate.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableAggregate.java new file mode 100644 index 000000000000..4164d2572519 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableAggregate.java @@ -0,0 +1,85 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.function.KsqlAggregateFunction; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KGroupedTable; +import org.apache.kafka.streams.kstream.KTable; + +@Immutable +public class TableAggregate implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final Formats formats; + private final Map indexToFunctionMap; + private final Map indexToValColumnMap; + + public TableAggregate( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final Formats formats, + final Map indexToFunctionMap, + final Map indexToValColumnMap) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.indexToFunctionMap = Objects.requireNonNull(indexToFunctionMap, "indexToFunctionMap"); + this.indexToValColumnMap = Objects.requireNonNull(indexToValColumnMap, "indexToValColumnMap"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KTable build(final StreamsBuilder builder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TableAggregate that = (TableAggregate) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source) + && Objects.equals(formats, that.formats) + && Objects.equals(indexToFunctionMap, that.indexToFunctionMap) + && Objects.equals(indexToValColumnMap, that.indexToValColumnMap); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, formats, indexToFunctionMap, indexToValColumnMap); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableFilter.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableFilter.java new file mode 100644 index 000000000000..cd047915a495 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableFilter.java @@ -0,0 +1,76 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.expression.tree.Expression; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KTable; + +@Immutable +public class TableFilter implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final Expression filterExpression; + + public TableFilter( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final Expression filterExpression + ) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.filterExpression = Objects.requireNonNull(filterExpression, "filterExpression"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KTable build(final StreamsBuilder builder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TableFilter that = (TableFilter) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source) + && Objects.equals(filterExpression, that.filterExpression); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, filterExpression); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java new file mode 100644 index 000000000000..14e79a38ca43 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java @@ -0,0 +1,81 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.expression.tree.Expression; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KGroupedTable; +import org.apache.kafka.streams.kstream.KTable; + +@Immutable +public class TableGroupBy implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final Formats formats; + private final List groupByExpressions; + + public TableGroupBy( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final Formats formats, + final List groupByExpressions + ) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.groupByExpressions = Objects.requireNonNull(groupByExpressions, "groupByExpressions"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KGroupedTable build(final StreamsBuilder builder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TableGroupBy that = (TableGroupBy) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source) + && Objects.equals(formats, that.formats) + && Objects.equals(groupByExpressions, that.groupByExpressions); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, formats, groupByExpressions); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableMapValues.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableMapValues.java new file mode 100644 index 000000000000..7e13e2433c6a --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableMapValues.java @@ -0,0 +1,75 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import io.confluent.ksql.GenericRow; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KTable; + +@Immutable +public class TableMapValues implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final List selectExpressions; + + public TableMapValues( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final List selectExpressions + ) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.selectExpressions = selectExpressions; + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KTable build(final StreamsBuilder builder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TableMapValues that = (TableMapValues) o; + return Objects.equals(properties, that.properties) + && Objects.equals(source, that.source) + && Objects.equals(selectExpressions, that.selectExpressions); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, selectExpressions); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSink.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSink.java new file mode 100644 index 000000000000..0984260f0013 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSink.java @@ -0,0 +1,83 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KTable; + +@Immutable +public class TableSink implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final Formats formats; + private final String topicName; + + public TableSink( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final Formats formats, + final String topicName + ) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.topicName = Objects.requireNonNull(topicName, "topicName"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + public String getTopicName() { + return topicName; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KTable build(final StreamsBuilder builder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TableSink tableSink = (TableSink) o; + return Objects.equals(properties, tableSink.properties) + && Objects.equals(source, tableSink.source) + && Objects.equals(formats, tableSink.formats) + && Objects.equals(topicName, tableSink.topicName); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, source, formats, topicName); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableTableJoin.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableTableJoin.java new file mode 100644 index 000000000000..c7852ef6fefa --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableTableJoin.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KTable; + +@Immutable +public class TableTableJoin implements ExecutionStep> { + private final ExecutionStepProperties properties; + private final JoinType joinType; + private final Formats formats; + private final ExecutionStep> left; + private final ExecutionStep> right; + + public TableTableJoin( + final ExecutionStepProperties properties, + final JoinType joinType, + final Formats formats, + final ExecutionStep> left, + final ExecutionStep> right) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.joinType = Objects.requireNonNull(joinType, "joinType"); + this.formats = Objects.requireNonNull(formats, "formats"); + this.left = Objects.requireNonNull(left, "left"); + this.right = Objects.requireNonNull(right, "right"); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return ImmutableList.of(left, right); + } + + @Override + public KTable build(final StreamsBuilder builder) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TableTableJoin that = (TableTableJoin) o; + return Objects.equals(properties, that.properties) + && joinType == that.joinType + && Objects.equals(formats, that.formats) + && Objects.equals(left, that.left) + && Objects.equals(right, that.right); + } + + @Override + public int hashCode() { + + return Objects.hash(properties, joinType, formats, left, right); + } +} diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/key-field.json b/ksql-functional-tests/src/test/resources/query-validation-tests/key-field.json index 78c696faf259..138630c1d524 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/key-field.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/key-field.json @@ -1495,4 +1495,4 @@ } } ] -} \ No newline at end of file +}