Skip to content

Commit

Permalink
Add getMaterializedViews SPI
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka authored and sopel39 committed Jun 7, 2021
1 parent 5576739 commit d1417dc
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 9 deletions.
5 changes: 5 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,11 @@ default ResolvedFunction getCoercion(Type fromType, Type toType)
*/
List<QualifiedObjectName> listMaterializedViews(Session session, QualifiedTablePrefix prefix);

/**
* Get the materialized view definitions that match the specified table prefix (never null).
*/
Map<QualifiedObjectName, ConnectorMaterializedViewDefinition> getMaterializedViews(Session session, QualifiedTablePrefix prefix);

/**
* Returns the materialized view definition for the specified view name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,20 @@ public Map<CatalogName, List<TableColumnsMetadata>> listTableColumns(Session ses
}
tableColumns.put(entry.getKey().asSchemaTableName(), Optional.of(columns.build()));
}

// if view and materialized view names overlap, the materialized view wins
for (Entry<QualifiedObjectName, ConnectorMaterializedViewDefinition> entry : getMaterializedViews(session, prefix).entrySet()) {
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
for (ConnectorMaterializedViewDefinition.Column column : entry.getValue().getColumns()) {
try {
columns.add(new ColumnMetadata(column.getName(), getType(column.getType())));
}
catch (TypeNotFoundException e) {
throw new TrinoException(INVALID_VIEW, format("Unknown type '%s' for column '%s' in materialized view: %s", column.getType(), column.getName(), entry.getKey()));
}
}
tableColumns.put(entry.getKey().asSchemaTableName(), Optional.of(columns.build()));
}
}
}
return ImmutableMap.of(
Expand Down Expand Up @@ -1285,6 +1299,44 @@ public List<QualifiedObjectName> listMaterializedViews(Session session, Qualifie
return ImmutableList.copyOf(materializedViews);
}

@Override
public Map<QualifiedObjectName, ConnectorMaterializedViewDefinition> getMaterializedViews(Session session, QualifiedTablePrefix prefix)
{
requireNonNull(prefix, "prefix is null");

Optional<CatalogMetadata> catalog = getOptionalCatalogMetadata(session, prefix.getCatalogName());

Map<QualifiedObjectName, ConnectorMaterializedViewDefinition> views = new LinkedHashMap<>();
if (catalog.isPresent()) {
CatalogMetadata catalogMetadata = catalog.get();

SchemaTablePrefix tablePrefix = prefix.asSchemaTablePrefix();
for (CatalogName catalogName : catalogMetadata.listConnectorIds()) {
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(catalogName);
ConnectorSession connectorSession = session.toConnectorSession(catalogName);

Map<SchemaTableName, ConnectorMaterializedViewDefinition> materializedViewMap;
if (tablePrefix.getTable().isPresent()) {
materializedViewMap = metadata.getMaterializedView(connectorSession, tablePrefix.toSchemaTableName())
.map(view -> ImmutableMap.of(tablePrefix.toSchemaTableName(), view))
.orElse(ImmutableMap.of());
}
else {
materializedViewMap = metadata.getMaterializedViews(connectorSession, tablePrefix.getSchema());
}

for (Entry<SchemaTableName, ConnectorMaterializedViewDefinition> entry : materializedViewMap.entrySet()) {
QualifiedObjectName viewName = new QualifiedObjectName(
prefix.getCatalogName(),
entry.getKey().getSchemaName(),
entry.getKey().getTableName());
views.put(viewName, entry.getValue());
}
}
}
return ImmutableMap.copyOf(views);
}

@Override
public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Session session, QualifiedObjectName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,12 @@ public List<QualifiedObjectName> listMaterializedViews(Session session, Qualifie
throw new UnsupportedOperationException();
}

@Override
public Map<QualifiedObjectName, ConnectorMaterializedViewDefinition> getMaterializedViews(Session session, QualifiedTablePrefix prefix)
{
throw new UnsupportedOperationException();
}

@Override
public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Session session, QualifiedObjectName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,20 @@ default List<SchemaTableName> listMaterializedViews(ConnectorSession session, Op
return List.of();
}

/**
* Gets the definitions of materialized views, possibly filtered by schema.
* This optional method may be implemented by connectors that can support fetching
* view data in bulk. It is used to populate {@code information_schema.columns}.
*/
default Map<SchemaTableName, ConnectorMaterializedViewDefinition> getMaterializedViews(ConnectorSession session, Optional<String> schemaName)
{
Map<SchemaTableName, ConnectorMaterializedViewDefinition> materializedViews = new HashMap<>();
for (SchemaTableName name : listMaterializedViews(session, schemaName)) {
getMaterializedView(session, name).ifPresent(view -> materializedViews.put(name, view));
}
return materializedViews;
}

/**
* Gets the materialized view data for the specified materialized view name. Returns {@link Optional#empty()}
* if {@code viewName} relation does not or is not a materialized view (e.g. is a table, or a view).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,14 @@ public List<SchemaTableName> listMaterializedViews(ConnectorSession session, Opt
}
}

@Override
public Map<SchemaTableName, ConnectorMaterializedViewDefinition> getMaterializedViews(ConnectorSession session, Optional<String> schemaName)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getMaterializedViews(session, schemaName);
}
}

@Override
public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,6 @@ protected void checkShowColumnsForMaterializedView(String schemaName, String vie
.hasMessageContaining(format("Table 'iceberg.%s.%s' does not exist", schemaName, viewName));
}

@Override
protected void checkInformationSchemaColumnsForMaterializedView(String schemaName, String viewName)
{
// TODO The query should not fail, obviously. It should return columns for all tables, views, and materialized views
assertThatThrownBy(() -> super.checkInformationSchemaColumnsForMaterializedView(schemaName, viewName))
.hasMessageFindingMatch("(?s)Expecting.*to contain:.*, nationkey\\).*, name\\).*, regionkey\\).*, comment\\)");
}

@Override
protected void checkInformationSchemaColumnsForPointedQueryForMaterializedView(String schemaName, String viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,14 @@ public void testTableListing()
public void testTableColumnListing()
{
// Verify information_schema.columns does not include columns from non-Iceberg tables
// TODO this should include columns from the materialized view as well
assertQuery(
"SELECT table_name, column_name FROM iceberg.information_schema.columns WHERE table_schema = 'test_schema'",
"VALUES " +
"('iceberg_table1', '_string'), " +
"('iceberg_table1', '_integer'), " +
"('iceberg_table2', '_double'), " +
"('iceberg_materialized_view', '_string'), " +
"('iceberg_materialized_view', '_integer'), " +
"('" + storageTable.getTableName() + "', '_string'), " +
"('" + storageTable.getTableName() + "', '_integer')");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void testColumnListing()
.containsOnly(
row("iceberg_table1", "_string"),
row("iceberg_table1", "_integer"),
row("iceberg_materialized_view", "_string"),
row("iceberg_materialized_view", "_integer"),
row(storageTable, "_string"),
row(storageTable, "_integer"));
}
Expand Down

0 comments on commit d1417dc

Please sign in to comment.