diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index 2c0101fad621c..b94e80b9311e9 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -13,6 +13,7 @@ */ package io.trino.metadata; +import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; import io.trino.Session; import io.trino.connector.CatalogName; @@ -290,6 +291,16 @@ public interface Metadata */ Optional finishInsert(Session session, InsertTableHandle tableHandle, Collection fragments, Collection computedStatistics); + /** + * Returns true if materialized view refresh should be delegated to connector + */ + boolean delegateMaterializedViewRefreshToConnector(Session session, QualifiedObjectName viewName); + + /** + * Refresh materialized view + */ + ListenableFuture refreshMaterializedView(Session session, QualifiedObjectName viewName); + /** * Begin refresh materialized view query */ diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index 320e998439e6d..bcf5592a986a3 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.google.common.collect.Streams; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.slice.Slice; import io.trino.Session; @@ -156,6 +157,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.primitives.Primitives.wrap; +import static io.airlift.concurrent.MoreFutures.toListenableFuture; import static io.trino.metadata.FunctionKind.AGGREGATE; import static io.trino.metadata.QualifiedObjectName.convertFromSchemaTableName; import static io.trino.metadata.RedirectionAwareTableHandle.noRedirection; @@ -937,6 +939,22 @@ public Optional finishInsert(Session session, InsertTab return metadata.finishInsert(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), fragments, computedStatistics); } + @Override + public boolean delegateMaterializedViewRefreshToConnector(Session session, QualifiedObjectName viewName) + { + CatalogName catalogName = new CatalogName(viewName.getCatalogName()); + ConnectorMetadata metadata = getMetadata(session, catalogName); + return metadata.delegateMaterializedViewRefreshToConnector(session.toConnectorSession(catalogName), viewName.asSchemaTableName()); + } + + @Override + public ListenableFuture refreshMaterializedView(Session session, QualifiedObjectName viewName) + { + CatalogName catalogName = new CatalogName(viewName.getCatalogName()); + ConnectorMetadata metadata = getMetadata(session, catalogName); + return toListenableFuture(metadata.refreshMaterializedView(session.toConnectorSession(catalogName), viewName.asSchemaTableName())); + } + @Override public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List sourceTableHandles) { diff --git a/core/trino-main/src/main/java/io/trino/operator/RefreshMaterializedViewOperator.java b/core/trino-main/src/main/java/io/trino/operator/RefreshMaterializedViewOperator.java new file mode 100644 index 0000000000000..b4c78b64ed2f9 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/RefreshMaterializedViewOperator.java @@ -0,0 +1,151 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator; + +import com.google.common.util.concurrent.ListenableFuture; +import io.trino.metadata.Metadata; +import io.trino.metadata.QualifiedObjectName; +import io.trino.spi.Page; +import io.trino.sql.planner.plan.PlanNodeId; + +import javax.annotation.Nullable; + +import static com.google.common.base.Preconditions.checkState; +import static io.airlift.concurrent.MoreFutures.getDone; +import static java.util.Objects.requireNonNull; + +public class RefreshMaterializedViewOperator + implements Operator +{ + public static class RefreshMaterializedViewOperatorFactory + implements OperatorFactory + { + private final int operatorId; + private final PlanNodeId planNodeId; + private final Metadata metadata; + private final QualifiedObjectName viewName; + private boolean closed; + + public RefreshMaterializedViewOperatorFactory(int operatorId, PlanNodeId planNodeId, Metadata metadata, QualifiedObjectName viewName) + { + this.operatorId = operatorId; + this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.viewName = requireNonNull(viewName, "viewName is null"); + } + + @Override + public Operator createOperator(DriverContext driverContext) + { + checkState(!closed, "Factory is already closed"); + OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, ValuesOperator.class.getSimpleName()); + return new RefreshMaterializedViewOperator(operatorContext, metadata, viewName); + } + + @Override + public void noMoreOperators() + { + closed = true; + } + + @Override + public OperatorFactory duplicate() + { + return new RefreshMaterializedViewOperatorFactory(operatorId, planNodeId, metadata, viewName); + } + } + + private final OperatorContext operatorContext; + private final Metadata metadata; + private final QualifiedObjectName viewName; + @Nullable + private ListenableFuture refreshFuture; + private boolean closed; + + public RefreshMaterializedViewOperator(OperatorContext operatorContext, Metadata metadata, QualifiedObjectName viewName) + { + this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.viewName = requireNonNull(viewName, "viewName is null"); + } + + @Override + public OperatorContext getOperatorContext() + { + return operatorContext; + } + + @Override + public ListenableFuture isBlocked() + { + if (refreshFuture == null) { + return NOT_BLOCKED; + } + + return refreshFuture; + } + + @Override + public boolean needsInput() + { + return false; + } + + @Override + public void addInput(Page page) + { + throw new UnsupportedOperationException(); + } + + @Override + public Page getOutput() + { + return null; + } + + @Override + public void finish() + { + } + + @Override + public void close() + { + if (refreshFuture != null) { + refreshFuture.cancel(false); + } + + closed = true; + } + + @Override + public boolean isFinished() + { + if (closed) { + return true; + } + + if (refreshFuture == null) { + // perform refresh in isFinished method. getOutput method won't be called by Driver + // since RefreshMaterializedViewOperator is last operator in pipeline + refreshFuture = metadata.refreshMaterializedView(operatorContext.getSession(), viewName); + } + + if (refreshFuture.isDone()) { + getDone(refreshFuture); + } + + return refreshFuture.isDone(); + } +} diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java index e04970b6bda2a..6d4b3511294fe 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java @@ -193,6 +193,7 @@ public class Analysis private Optional create = Optional.empty(); private Optional insert = Optional.empty(); private Optional refreshMaterializedView = Optional.empty(); + private Optional delegatedRefreshMaterializedView = Optional.empty(); private Optional analyzeTarget = Optional.empty(); private Optional> updatedColumns = Optional.empty(); @@ -723,6 +724,16 @@ public Optional getRefreshMaterializedView() return refreshMaterializedView; } + public void setDelegatedRefreshMaterializedView(QualifiedObjectName viewName) + { + this.delegatedRefreshMaterializedView = Optional.of(viewName); + } + + public Optional getDelegatedRefreshMaterializedView() + { + return delegatedRefreshMaterializedView; + } + public Query getNamedQuery(Table table) { return namedQueries.get(NodeRef.of(table)); diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index b9b20b5c557a0..ad06b3771704a 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -531,6 +531,16 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate accessControl.checkCanRefreshMaterializedView(session.toSecurityContext(), name); + if (metadata.delegateMaterializedViewRefreshToConnector(session, name)) { + analysis.setDelegatedRefreshMaterializedView(name); + analysis.setUpdateType( + "REFRESH MATERIALIZED VIEW", + name, + Optional.empty(), + Optional.empty()); + return createAndAssignScope(refreshMaterializedView, scope); + } + Optional storageName = getMaterializedViewStorageTableName(optionalView.get()); if (storageName.isEmpty()) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/DistributedExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/DistributedExecutionPlanner.java index 7eb7803293b2f..820cd71edf3f2 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/DistributedExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/DistributedExecutionPlanner.java @@ -47,6 +47,7 @@ import io.trino.sql.planner.plan.PlanNodeId; import io.trino.sql.planner.plan.PlanVisitor; import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RemoteSourceNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; @@ -393,6 +394,13 @@ public Map visitSort(SortNode node, Void context) return node.getSource().accept(this, context); } + @Override + public Map visitRefreshMaterializedView(RefreshMaterializedViewNode node, Void context) + { + // RefreshMaterializedViewNode does not have splits + return ImmutableMap.of(); + } + @Override public Map visitTableWriter(TableWriterNode node, Void context) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 5bde0a895dfd3..d48f8924cfabf 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -70,6 +70,7 @@ import io.trino.operator.PartitionFunction; import io.trino.operator.PartitionedOutputOperator.PartitionedOutputFactory; import io.trino.operator.PipelineExecutionStrategy; +import io.trino.operator.RefreshMaterializedViewOperator.RefreshMaterializedViewOperatorFactory; import io.trino.operator.RowNumberOperator; import io.trino.operator.ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory; import io.trino.operator.SetBuilderOperator.SetBuilderOperatorFactory; @@ -184,6 +185,7 @@ import io.trino.sql.planner.plan.PlanNodeId; import io.trino.sql.planner.plan.PlanVisitor; import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RemoteSourceNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; @@ -2795,6 +2797,14 @@ private Set getCoordinatorDynamicFilters(Set d return ImmutableSet.of(); } + @Override + public PhysicalOperation visitRefreshMaterializedView(RefreshMaterializedViewNode node, LocalExecutionPlanContext context) + { + context.setDriverInstanceCount(1); + OperatorFactory operatorFactory = new RefreshMaterializedViewOperatorFactory(context.getNextOperatorId(), node.getId(), metadata, node.getViewName()); + return new PhysicalOperation(operatorFactory, makeLayout(node), context, UNGROUPED_EXECUTION); + } + @Override public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java index 30e76c6fb1e28..10505b38fd19d 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java @@ -55,6 +55,7 @@ import io.trino.sql.planner.plan.OutputNode; import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.StatisticAggregations; import io.trino.sql.planner.plan.StatisticsWriterNode; import io.trino.sql.planner.plan.TableFinishNode; @@ -265,7 +266,6 @@ private RelationPlan planStatementWithoutOutput(Analysis analysis, Statement sta return createInsertPlan(analysis, (Insert) statement); } if (statement instanceof RefreshMaterializedView) { - checkState(analysis.getRefreshMaterializedView().isPresent(), "RefreshMaterializedViewAnalysis handle is missing"); return createRefreshMaterializedViewPlan(analysis); } if (statement instanceof Delete) { @@ -481,6 +481,16 @@ private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement) private RelationPlan createRefreshMaterializedViewPlan(Analysis analysis) { + Optional delegatedRefreshMaterializedView = analysis.getDelegatedRefreshMaterializedView(); + if (delegatedRefreshMaterializedView.isPresent()) { + return new RelationPlan( + new RefreshMaterializedViewNode(idAllocator.getNextId(), delegatedRefreshMaterializedView.get()), + analysis.getRootScope(), + ImmutableList.of(), + Optional.empty()); + } + + checkState(analysis.getRefreshMaterializedView().isPresent(), "RefreshMaterializedViewAnalysis handle is missing"); Analysis.RefreshMaterializedViewAnalysis viewAnalysis = analysis.getRefreshMaterializedView().get(); TableHandle tableHandle = viewAnalysis.getTarget(); Query query = viewAnalysis.getQuery(); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragmenter.java b/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragmenter.java index 5c8b6389f9334..b31212ff66a5f 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragmenter.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragmenter.java @@ -39,6 +39,7 @@ import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.PlanNodeId; import io.trino.sql.planner.plan.PlanVisitor; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RemoteSourceNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SimplePlanRewriter; @@ -315,6 +316,13 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext context) + { + context.get().setCoordinatorOnlyDistribution(); + return context.defaultRewrite(node, context.get()); + } + @Override public PlanNode visitTableWriter(TableWriterNode node, RewriteContext context) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java index ac8073ba3e642..67a83ec5c4f56 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java @@ -59,6 +59,7 @@ import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.PlanVisitor; import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SemiJoinNode; import io.trino.sql.planner.plan.SortNode; @@ -584,6 +585,12 @@ public PlanWithProperties visitTableScan(TableScanNode node, PreferredProperties return new PlanWithProperties(node, deriveProperties(node, ImmutableList.of())); } + @Override + public PlanWithProperties visitRefreshMaterializedView(RefreshMaterializedViewNode node, PreferredProperties preferredProperties) + { + return new PlanWithProperties(node, deriveProperties(node, ImmutableList.of())); + } + @Override public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProperties preferredProperties) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java index bd311828a5800..61b453fa0a109 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java @@ -60,6 +60,7 @@ import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.PlanVisitor; import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; import io.trino.sql.planner.plan.SemiJoinNode; @@ -741,6 +742,14 @@ else if (!(value instanceof Expression)) { .build(); } + @Override + public ActualProperties visitRefreshMaterializedView(RefreshMaterializedViewNode node, List inputProperties) + { + return ActualProperties.builder() + .global(coordinatorSingleStreamPartition()) + .build(); + } + @Override public ActualProperties visitTableWriter(TableWriterNode node, List inputProperties) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java index f9e2cffacd43d..630a4f2216c29 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java @@ -50,6 +50,7 @@ import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.PlanVisitor; import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; import io.trino.sql.planner.plan.SemiJoinNode; @@ -445,6 +446,12 @@ public StreamProperties visitUpdate(UpdateNode node, List inpu return properties.withUnspecifiedPartitioning(); } + @Override + public StreamProperties visitRefreshMaterializedView(RefreshMaterializedViewNode node, List inputProperties) + { + return StreamProperties.singleStream(); + } + @Override public StreamProperties visitTableWriter(TableWriterNode node, List inputProperties) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java index b3b86177e6135..90c5ea8edb6fd 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java @@ -58,6 +58,7 @@ import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.PlanVisitor; import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RemoteSourceNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; @@ -619,6 +620,12 @@ public PlanAndMappings visitStatisticsWriterNode(StatisticsWriterNode node, Unal return new PlanAndMappings(rewrittenStatisticsWriter, mapping); } + @Override + public PlanAndMappings visitRefreshMaterializedView(RefreshMaterializedViewNode node, UnaliasContext context) + { + return new PlanAndMappings(node, ImmutableMap.of()); + } + @Override public PlanAndMappings visitTableWriter(TableWriterNode node, UnaliasContext context) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNode.java index f8c344ef3668b..cc9fb46fd44fa 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNode.java @@ -47,6 +47,7 @@ @JsonSubTypes.Type(value = SpatialJoinNode.class, name = "spatialjoin"), @JsonSubTypes.Type(value = IndexJoinNode.class, name = "indexjoin"), @JsonSubTypes.Type(value = IndexSourceNode.class, name = "indexsource"), + @JsonSubTypes.Type(value = RefreshMaterializedViewNode.class, name = "refreshmaterializedview"), @JsonSubTypes.Type(value = TableWriterNode.class, name = "tablewriter"), @JsonSubTypes.Type(value = DeleteNode.class, name = "delete"), @JsonSubTypes.Type(value = UpdateNode.class, name = "update"), diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanVisitor.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanVisitor.java index 11fe1074dae42..277cdc5d9ce6a 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanVisitor.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanVisitor.java @@ -119,6 +119,11 @@ public R visitWindow(WindowNode node, C context) return visitPlan(node, context); } + public R visitRefreshMaterializedView(RefreshMaterializedViewNode node, C context) + { + return visitPlan(node, context); + } + public R visitTableWriter(TableWriterNode node, C context) { return visitPlan(node, context); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/RefreshMaterializedViewNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/RefreshMaterializedViewNode.java new file mode 100644 index 0000000000000..2ff6049ebdba9 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/RefreshMaterializedViewNode.java @@ -0,0 +1,74 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.sql.planner.plan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import io.trino.metadata.QualifiedObjectName; +import io.trino.sql.planner.Symbol; + +import javax.annotation.concurrent.Immutable; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +@Immutable +public class RefreshMaterializedViewNode + extends PlanNode +{ + private final QualifiedObjectName viewName; + + @JsonCreator + public RefreshMaterializedViewNode( + @JsonProperty("id") PlanNodeId id, + @JsonProperty("viewName") QualifiedObjectName viewName) + { + super(id); + this.viewName = requireNonNull(viewName, "viewName is null"); + } + + @JsonProperty + public QualifiedObjectName getViewName() + { + return viewName; + } + + @Override + public List getSources() + { + return ImmutableList.of(); + } + + @Override + public List getOutputSymbols() + { + return ImmutableList.of(); + } + + @Override + public PlanNode replaceChildren(List newChildren) + { + checkArgument(newChildren.isEmpty(), "newChildren is not empty"); + return this; + } + + @Override + public R accept(PlanVisitor visitor, C context) + { + return visitor.visitRefreshMaterializedView(this, context); + } +} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java index 9833d31f331b5..d39c11a0b2877 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java @@ -80,6 +80,7 @@ import io.trino.sql.planner.plan.PlanNodeId; import io.trino.sql.planner.plan.PlanVisitor; import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RemoteSourceNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; @@ -1190,6 +1191,14 @@ public Void visitExcept(ExceptNode node, Void context) return processChildren(node, context); } + @Override + public Void visitRefreshMaterializedView(RefreshMaterializedViewNode node, Void context) + { + addNode(node, "RefreshMaterializedView", format("[%s]", node.getViewName())); + + return null; + } + @Override public Void visitTableWriter(TableWriterNode node, Void context) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateDependenciesChecker.java b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateDependenciesChecker.java index ccf5dd566f343..abf6479cc170d 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateDependenciesChecker.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateDependenciesChecker.java @@ -50,6 +50,7 @@ import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.PlanVisitor; import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RemoteSourceNode; import io.trino.sql.planner.plan.RowNumberNode; import io.trino.sql.planner.plan.SampleNode; @@ -601,6 +602,12 @@ public Void visitExchange(ExchangeNode node, Set boundSymbols) return null; } + @Override + public Void visitRefreshMaterializedView(RefreshMaterializedViewNode node, Set boundSymbols) + { + return null; + } + @Override public Void visitTableWriter(TableWriterNode node, Set boundSymbols) { diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index 353c56bfd3eae..f78a63d44ec47 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -94,6 +94,8 @@ public class MockConnector private final Optional>> streamTableColumns; private final BiFunction> getViews; private final BiFunction> getMaterializedViews; + private final BiFunction delegateMaterializedViewRefreshToConnector; + private final BiFunction> refreshMaterializedView; private final BiFunction getTableHandle; private final Function> getColumns; private final MockConnectorFactory.ApplyProjection applyProjection; @@ -116,6 +118,8 @@ public class MockConnector Optional>> streamTableColumns, BiFunction> getViews, BiFunction> getMaterializedViews, + BiFunction delegateMaterializedViewRefreshToConnector, + BiFunction> refreshMaterializedView, BiFunction getTableHandle, Function> getColumns, MockConnectorFactory.ApplyProjection applyProjection, @@ -137,6 +141,8 @@ public class MockConnector this.streamTableColumns = requireNonNull(streamTableColumns, "streamTableColumns is null"); this.getViews = requireNonNull(getViews, "getViews is null"); this.getMaterializedViews = requireNonNull(getMaterializedViews, "getMaterializedViews is null"); + this.delegateMaterializedViewRefreshToConnector = requireNonNull(delegateMaterializedViewRefreshToConnector, "delegateMaterializedViewRefreshToConnector is null"); + this.refreshMaterializedView = requireNonNull(refreshMaterializedView, "refreshMaterializedView is null"); this.getTableHandle = requireNonNull(getTableHandle, "getTableHandle is null"); this.getColumns = requireNonNull(getColumns, "getColumns is null"); this.applyProjection = requireNonNull(applyProjection, "applyProjection is null"); @@ -367,6 +373,18 @@ public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession s return new MaterializedViewFreshness(view.getStorageTable().isPresent()); } + @Override + public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession session, SchemaTableName viewName) + { + return delegateMaterializedViewRefreshToConnector.apply(session, viewName); + } + + @Override + public CompletableFuture refreshMaterializedView(ConnectorSession session, SchemaTableName viewName) + { + return refreshMaterializedView.apply(session, viewName); + } + @Override public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List sourceTableHandles) { diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java index a074d60a2466d..640004bf9c83f 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java @@ -54,6 +54,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -72,6 +73,8 @@ public class MockConnectorFactory Optional>> streamTableColumns; private final BiFunction> getViews; private final BiFunction> getMaterializedViews; + private final BiFunction delegateMaterializedViewRefreshToConnector; + private final BiFunction> refreshMaterializedView; private final BiFunction getTableHandle; private final Function> getColumns; private final ApplyProjection applyProjection; @@ -94,6 +97,8 @@ private MockConnectorFactory( Optional>> streamTableColumns, BiFunction> getViews, BiFunction> getMaterializedViews, + BiFunction delegateMaterializedViewRefreshToConnector, + BiFunction> refreshMaterializedView, BiFunction getTableHandle, Function> getColumns, ApplyProjection applyProjection, @@ -115,6 +120,8 @@ private MockConnectorFactory( this.streamTableColumns = requireNonNull(streamTableColumns, "streamTableColumns is null"); this.getViews = requireNonNull(getViews, "getViews is null"); this.getMaterializedViews = requireNonNull(getMaterializedViews, "getMaterializedViews is null"); + this.delegateMaterializedViewRefreshToConnector = requireNonNull(delegateMaterializedViewRefreshToConnector, "delegateMaterializedViewRefreshToConnector is null"); + this.refreshMaterializedView = requireNonNull(refreshMaterializedView, "refreshMaterializedView is null"); this.getTableHandle = requireNonNull(getTableHandle, "getTableHandle is null"); this.getColumns = requireNonNull(getColumns, "getColumns is null"); this.applyProjection = requireNonNull(applyProjection, "applyProjection is null"); @@ -153,6 +160,8 @@ public Connector create(String catalogName, Map config, Connecto streamTableColumns, getViews, getMaterializedViews, + delegateMaterializedViewRefreshToConnector, + refreshMaterializedView, getTableHandle, getColumns, applyProjection, @@ -245,6 +254,8 @@ public static final class Builder private Optional>> streamTableColumns = Optional.empty(); private BiFunction> getViews = defaultGetViews(); private BiFunction> getMaterializedViews = defaultGetMaterializedViews(); + private BiFunction delegateMaterializedViewRefreshToConnector = (session, viewName) -> false; + private BiFunction> refreshMaterializedView = (session, viewName) -> CompletableFuture.completedFuture(null); private BiFunction getTableHandle = defaultGetTableHandle(); private Function> getColumns = defaultGetColumns(); private ApplyProjection applyProjection = (session, handle, projections, assignments) -> Optional.empty(); @@ -300,6 +311,18 @@ public Builder withGetMaterializedViews(BiFunction delegateMaterializedViewRefreshToConnector) + { + this.delegateMaterializedViewRefreshToConnector = requireNonNull(delegateMaterializedViewRefreshToConnector, "delegateMaterializedViewRefreshToConnector is null"); + return this; + } + + public Builder withRefreshMaterializedView(BiFunction> refreshMaterializedView) + { + this.refreshMaterializedView = requireNonNull(refreshMaterializedView, "refreshMaterializedView is null"); + return this; + } + public Builder withGetTableHandle(BiFunction getTableHandle) { this.getTableHandle = requireNonNull(getTableHandle, "getTableHandle is null"); @@ -420,6 +443,8 @@ public MockConnectorFactory build() streamTableColumns, getViews, getMaterializedViews, + delegateMaterializedViewRefreshToConnector, + refreshMaterializedView, getTableHandle, getColumns, applyProjection, diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index ed7c4864195ab..b8b85450a3cdc 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; import io.trino.Session; import io.trino.connector.CatalogName; @@ -358,6 +359,18 @@ public Optional finishInsert(Session session, InsertTab throw new UnsupportedOperationException(); } + @Override + public boolean delegateMaterializedViewRefreshToConnector(Session session, QualifiedObjectName viewName) + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture refreshMaterializedView(Session session, QualifiedObjectName viewName) + { + throw new UnsupportedOperationException(); + } + @Override public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List sourceTableHandles) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index 333852440f304..22cc75dc6528f 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -36,6 +36,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -490,6 +491,22 @@ default Optional finishInsert(ConnectorSession session, throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata beginInsert() is implemented without finishInsert()"); } + /** + * Returns true if materialized view refresh should be delegated to connector using {@link ConnectorMetadata#refreshMaterializedView} + */ + default boolean delegateMaterializedViewRefreshToConnector(ConnectorSession session, SchemaTableName viewName) + { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support materialized views"); + } + + /** + * Refresh materialized view + */ + default CompletableFuture refreshMaterializedView(ConnectorSession session, SchemaTableName viewName) + { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support materialized views"); + } + /** * Begin materialized view query */ diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index 60c9693d8d005..f380b29d975b0 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -74,6 +74,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import static java.util.Objects.requireNonNull; @@ -471,6 +472,22 @@ public Optional finishInsert(ConnectorSession session, } } + @Override + public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession session, SchemaTableName viewName) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.delegateMaterializedViewRefreshToConnector(session, viewName); + } + } + + @Override + public CompletableFuture refreshMaterializedView(ConnectorSession session, SchemaTableName viewName) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.refreshMaterializedView(session, viewName); + } + } + @Override public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List sourceTableHandles) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 8596f76b0b660..5eeba88ac0453 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -948,6 +948,12 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN metastore.dropTable(identity, viewName.getSchemaName(), viewName.getTableName(), true); } + @Override + public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession session, SchemaTableName viewName) + { + return false; + } + @Override public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List sourceTableHandles) { diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestRefreshMaterializedView.java b/testing/trino-tests/src/test/java/io/trino/execution/TestRefreshMaterializedView.java new file mode 100644 index 0000000000000..a37aab1971b33 --- /dev/null +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestRefreshMaterializedView.java @@ -0,0 +1,154 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.SettableFuture; +import io.trino.Session; +import io.trino.connector.MockConnectorFactory; +import io.trino.connector.MockConnectorPlugin; +import io.trino.connector.MockConnectorTableHandle; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.SchemaTableName; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static io.airlift.concurrent.MoreFutures.addExceptionCallback; +import static io.airlift.concurrent.MoreFutures.getFutureValue; +import static io.airlift.concurrent.MoreFutures.toCompletableFuture; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@Test(singleThreaded = true) +public class TestRefreshMaterializedView + extends AbstractTestQueryFramework +{ + private ListeningExecutorService executorService; + private SettableFuture startRefreshMaterializedView; + private SettableFuture finishRefreshMaterializedView; + private boolean refreshInterrupted; + + @BeforeClass + public void setUp() + { + executorService = listeningDecorator(newCachedThreadPool()); + } + + @AfterClass(alwaysRun = true) + public void shutdown() + { + executorService.shutdownNow(); + } + + @BeforeMethod + public void resetState() + { + startRefreshMaterializedView = SettableFuture.create(); + finishRefreshMaterializedView = SettableFuture.create(); + refreshInterrupted = false; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session session = testSessionBuilder() + .setCatalog("mock") + .setSchema("default") + .build(); + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session) + .build(); + queryRunner.installPlugin( + new MockConnectorPlugin( + MockConnectorFactory.builder() + .withListSchemaNames(connectionSession -> ImmutableList.of("default")) + .withGetColumns(schemaTableName -> ImmutableList.of(new ColumnMetadata("nationkey", BIGINT))) + .withGetTableHandle((connectorSession, tableName) -> new MockConnectorTableHandle(tableName)) + .withGetMaterializedViews((connectorSession, schemaTablePrefix) -> ImmutableMap.of( + new SchemaTableName("default", "delegate_refresh_to_connector"), + new ConnectorMaterializedViewDefinition( + "SELECT nationkey FROM mock.default.test_table", + Optional.of(new CatalogSchemaTableName("mock", "default", "test_storage")), + Optional.of("mock"), + Optional.of("default"), + ImmutableList.of(new ConnectorMaterializedViewDefinition.Column("nationkey", BIGINT.getTypeId())), + Optional.empty(), + "alice", + ImmutableMap.of()))) + .withDelegateMaterializedViewRefreshToConnector((connectorSession, schemaTableName) -> true) + .withRefreshMaterializedView(((connectorSession, schemaTableName) -> { + startRefreshMaterializedView.set(null); + SettableFuture refreshMaterializedView = SettableFuture.create(); + finishRefreshMaterializedView.addListener(() -> refreshMaterializedView.set(null), directExecutor()); + addExceptionCallback(refreshMaterializedView, () -> refreshInterrupted = true); + return toCompletableFuture(refreshMaterializedView); + })) + .build())); + queryRunner.createCatalog("mock", "mock"); + return queryRunner; + } + + @Test(timeOut = 30_000) + public void testDelegateRefreshMaterializedViewToConnector() + { + ListenableFuture queryFuture = assertUpdateAsync("REFRESH MATERIALIZED VIEW mock.default.delegate_refresh_to_connector"); + + // wait for connector to start refreshing MV + getFutureValue(startRefreshMaterializedView); + // finish MV refresh + finishRefreshMaterializedView.set(null); + + getFutureValue(queryFuture); + } + + @Test(timeOut = 30_000) + public void testDelegateRefreshMaterializedViewToConnectorWithCancellation() + { + ListenableFuture queryFuture = assertUpdateAsync("REFRESH MATERIALIZED VIEW mock.default.delegate_refresh_to_connector"); + + // wait for connector to start refreshing MV + getFutureValue(startRefreshMaterializedView); + + // cancel refresh query + QueryManager queryManager = getDistributedQueryRunner().getCoordinator().getQueryManager(); + queryManager.getQueries().forEach(query -> queryManager.cancelQuery(query.getQueryId())); + + assertThatThrownBy(() -> getFutureValue(queryFuture)) + .hasMessage("Query was canceled"); + assertThat(refreshInterrupted).isTrue(); + } + + private ListenableFuture assertUpdateAsync(@Language("SQL") String sql) + { + return executorService.submit(() -> assertUpdate(sql)); + } +}