Skip to content

Commit

Permalink
Enable connector to perform materialized view refresh itself
Browse files Browse the repository at this point in the history
Introduce ConnectorMetadata#refreshMaterializedView that will
be called when materialized view refresh is delegated to connector.
  • Loading branch information
sopel39 committed Jun 8, 2021
1 parent 557ff7b commit d8787a5
Show file tree
Hide file tree
Showing 25 changed files with 614 additions and 1 deletion.
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.metadata;

import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.trino.Session;
import io.trino.connector.CatalogName;
Expand Down Expand Up @@ -290,6 +291,16 @@ public interface Metadata
*/
Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics);

/**
* Returns true if materialized view refresh should be delegated to connector
*/
boolean delegateMaterializedViewRefreshToConnector(Session session, QualifiedObjectName viewName);

/**
* Refresh materialized view
*/
ListenableFuture<?> refreshMaterializedView(Session session, QualifiedObjectName viewName);

/**
* Begin refresh materialized view query
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.slice.Slice;
import io.trino.Session;
Expand Down Expand Up @@ -156,6 +157,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.primitives.Primitives.wrap;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.trino.metadata.FunctionKind.AGGREGATE;
import static io.trino.metadata.QualifiedObjectName.convertFromSchemaTableName;
import static io.trino.metadata.RedirectionAwareTableHandle.noRedirection;
Expand Down Expand Up @@ -937,6 +939,22 @@ public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTab
return metadata.finishInsert(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), fragments, computedStatistics);
}

@Override
public boolean delegateMaterializedViewRefreshToConnector(Session session, QualifiedObjectName viewName)
{
CatalogName catalogName = new CatalogName(viewName.getCatalogName());
ConnectorMetadata metadata = getMetadata(session, catalogName);
return metadata.delegateMaterializedViewRefreshToConnector(session.toConnectorSession(catalogName), viewName.asSchemaTableName());
}

@Override
public ListenableFuture<?> refreshMaterializedView(Session session, QualifiedObjectName viewName)
{
CatalogName catalogName = new CatalogName(viewName.getCatalogName());
ConnectorMetadata metadata = getMetadata(session, catalogName);
return toListenableFuture(metadata.refreshMaterializedView(session.toConnectorSession(catalogName), viewName.asSchemaTableName()));
}

@Override
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator;

import com.google.common.util.concurrent.ListenableFuture;
import io.trino.metadata.Metadata;
import io.trino.metadata.QualifiedObjectName;
import io.trino.spi.Page;
import io.trino.sql.planner.plan.PlanNodeId;

import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkState;
import static io.airlift.concurrent.MoreFutures.getDone;
import static java.util.Objects.requireNonNull;

public class RefreshMaterializedViewOperator
implements Operator
{
public static class RefreshMaterializedViewOperatorFactory
implements OperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
private final Metadata metadata;
private final QualifiedObjectName viewName;
private boolean closed;

public RefreshMaterializedViewOperatorFactory(int operatorId, PlanNodeId planNodeId, Metadata metadata, QualifiedObjectName viewName)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.viewName = requireNonNull(viewName, "viewName is null");
}

@Override
public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, ValuesOperator.class.getSimpleName());
return new RefreshMaterializedViewOperator(operatorContext, metadata, viewName);
}

@Override
public void noMoreOperators()
{
closed = true;
}

@Override
public OperatorFactory duplicate()
{
return new RefreshMaterializedViewOperatorFactory(operatorId, planNodeId, metadata, viewName);
}
}

private final OperatorContext operatorContext;
private final Metadata metadata;
private final QualifiedObjectName viewName;
@Nullable
private ListenableFuture<?> refreshFuture;
private boolean closed;

public RefreshMaterializedViewOperator(OperatorContext operatorContext, Metadata metadata, QualifiedObjectName viewName)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.viewName = requireNonNull(viewName, "viewName is null");
}

@Override
public OperatorContext getOperatorContext()
{
return operatorContext;
}

@Override
public ListenableFuture<?> isBlocked()
{
if (refreshFuture == null) {
return NOT_BLOCKED;
}

return refreshFuture;
}

@Override
public boolean needsInput()
{
return false;
}

@Override
public void addInput(Page page)
{
throw new UnsupportedOperationException();
}

@Override
public Page getOutput()
{
return null;
}

@Override
public void finish()
{
}

@Override
public void close()
{
if (refreshFuture != null) {
refreshFuture.cancel(false);
}

closed = true;
}

@Override
public boolean isFinished()
{
if (closed) {
return true;
}

if (refreshFuture == null) {
// perform refresh in isFinished method. getOutput method won't be called by Driver
// since RefreshMaterializedViewOperator is last operator in pipeline
refreshFuture = metadata.refreshMaterializedView(operatorContext.getSession(), viewName);
}

if (refreshFuture.isDone()) {
getDone(refreshFuture);
}

return refreshFuture.isDone();
}
}
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public class Analysis
private Optional<Create> create = Optional.empty();
private Optional<Insert> insert = Optional.empty();
private Optional<RefreshMaterializedViewAnalysis> refreshMaterializedView = Optional.empty();
private Optional<QualifiedObjectName> delegatedRefreshMaterializedView = Optional.empty();
private Optional<TableHandle> analyzeTarget = Optional.empty();
private Optional<List<ColumnMetadata>> updatedColumns = Optional.empty();

Expand Down Expand Up @@ -723,6 +724,16 @@ public Optional<RefreshMaterializedViewAnalysis> getRefreshMaterializedView()
return refreshMaterializedView;
}

public void setDelegatedRefreshMaterializedView(QualifiedObjectName viewName)
{
this.delegatedRefreshMaterializedView = Optional.of(viewName);
}

public Optional<QualifiedObjectName> getDelegatedRefreshMaterializedView()
{
return delegatedRefreshMaterializedView;
}

public Query getNamedQuery(Table table)
{
return namedQueries.get(NodeRef.of(table));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,16 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate

accessControl.checkCanRefreshMaterializedView(session.toSecurityContext(), name);

if (metadata.delegateMaterializedViewRefreshToConnector(session, name)) {
analysis.setDelegatedRefreshMaterializedView(name);
analysis.setUpdateType(
"REFRESH MATERIALIZED VIEW",
name,
Optional.empty(),
Optional.empty());
return createAndAssignScope(refreshMaterializedView, scope);
}

Optional<QualifiedName> storageName = getMaterializedViewStorageTableName(optionalView.get());

if (storageName.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SampleNode;
Expand Down Expand Up @@ -393,6 +394,13 @@ public Map<PlanNodeId, SplitSource> visitSort(SortNode node, Void context)
return node.getSource().accept(this, context);
}

@Override
public Map<PlanNodeId, SplitSource> visitRefreshMaterializedView(RefreshMaterializedViewNode node, Void context)
{
// RefreshMaterializedViewNode does not have splits
return ImmutableMap.of();
}

@Override
public Map<PlanNodeId, SplitSource> visitTableWriter(TableWriterNode node, Void context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.trino.operator.PartitionFunction;
import io.trino.operator.PartitionedOutputOperator.PartitionedOutputFactory;
import io.trino.operator.PipelineExecutionStrategy;
import io.trino.operator.RefreshMaterializedViewOperator.RefreshMaterializedViewOperatorFactory;
import io.trino.operator.RowNumberOperator;
import io.trino.operator.ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory;
import io.trino.operator.SetBuilderOperator.SetBuilderOperatorFactory;
Expand Down Expand Up @@ -184,6 +185,7 @@
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SampleNode;
Expand Down Expand Up @@ -2795,6 +2797,14 @@ private Set<DynamicFilterId> getCoordinatorDynamicFilters(Set<DynamicFilterId> d
return ImmutableSet.of();
}

@Override
public PhysicalOperation visitRefreshMaterializedView(RefreshMaterializedViewNode node, LocalExecutionPlanContext context)
{
context.setDriverInstanceCount(1);
OperatorFactory operatorFactory = new RefreshMaterializedViewOperatorFactory(context.getNextOperatorId(), node.getId(), metadata, node.getViewName());
return new PhysicalOperation(operatorFactory, makeLayout(node), context, UNGROUPED_EXECUTION);
}

@Override
public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.trino.sql.planner.plan.OutputNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.StatisticAggregations;
import io.trino.sql.planner.plan.StatisticsWriterNode;
import io.trino.sql.planner.plan.TableFinishNode;
Expand Down Expand Up @@ -265,7 +266,6 @@ private RelationPlan planStatementWithoutOutput(Analysis analysis, Statement sta
return createInsertPlan(analysis, (Insert) statement);
}
if (statement instanceof RefreshMaterializedView) {
checkState(analysis.getRefreshMaterializedView().isPresent(), "RefreshMaterializedViewAnalysis handle is missing");
return createRefreshMaterializedViewPlan(analysis);
}
if (statement instanceof Delete) {
Expand Down Expand Up @@ -481,6 +481,16 @@ private RelationPlan createInsertPlan(Analysis analysis, Insert insertStatement)

private RelationPlan createRefreshMaterializedViewPlan(Analysis analysis)
{
Optional<QualifiedObjectName> delegatedRefreshMaterializedView = analysis.getDelegatedRefreshMaterializedView();
if (delegatedRefreshMaterializedView.isPresent()) {
return new RelationPlan(
new RefreshMaterializedViewNode(idAllocator.getNextId(), delegatedRefreshMaterializedView.get()),
analysis.getRootScope(),
ImmutableList.of(),
Optional.empty());
}

checkState(analysis.getRefreshMaterializedView().isPresent(), "RefreshMaterializedViewAnalysis handle is missing");
Analysis.RefreshMaterializedViewAnalysis viewAnalysis = analysis.getRefreshMaterializedView().get();
TableHandle tableHandle = viewAnalysis.getTarget();
Query query = viewAnalysis.getQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SimplePlanRewriter;
Expand Down Expand Up @@ -315,6 +316,13 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<FragmentProper
return context.defaultRewrite(node, context.get());
}

@Override
public PlanNode visitRefreshMaterializedView(RefreshMaterializedViewNode node, RewriteContext<FragmentProperties> context)
{
context.get().setCoordinatorOnlyDistribution();
return context.defaultRewrite(node, context.get());
}

@Override
public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<FragmentProperties> context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.RowNumberNode;
import io.trino.sql.planner.plan.SemiJoinNode;
import io.trino.sql.planner.plan.SortNode;
Expand Down Expand Up @@ -584,6 +585,12 @@ public PlanWithProperties visitTableScan(TableScanNode node, PreferredProperties
return new PlanWithProperties(node, deriveProperties(node, ImmutableList.of()));
}

@Override
public PlanWithProperties visitRefreshMaterializedView(RefreshMaterializedViewNode node, PreferredProperties preferredProperties)
{
return new PlanWithProperties(node, deriveProperties(node, ImmutableList.of()));
}

@Override
public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProperties preferredProperties)
{
Expand Down
Loading

0 comments on commit d8787a5

Please sign in to comment.