Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter table columns in bulk #18956

Merged
merged 1 commit into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 65 additions & 50 deletions core/trino-main/src/main/java/io/trino/metadata/MetadataListing.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,61 +248,76 @@ private static Map<SchemaTableName, List<ColumnMetadata>> doListTableColumns(Ses
prefix,
relationNames -> accessControl.filterTables(session.toSecurityContext(), prefix.getCatalogName(), relationNames));

Map<SchemaTableName, Optional<List<ColumnMetadata>>> tableColumns = catalogColumns.stream()
.collect(toImmutableMap(TableColumnsMetadata::getTable, TableColumnsMetadata::getColumns));

ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> result = ImmutableMap.builder();

tableColumns.forEach((table, columnsOptional) -> {
QualifiedObjectName originalTableName = new QualifiedObjectName(prefix.getCatalogName(), table.getSchemaName(), table.getTableName());
List<ColumnMetadata> columns;
QualifiedObjectName actualTableName;

if (columnsOptional.isPresent()) {
actualTableName = originalTableName;
columns = columnsOptional.get();
}
else {
TableHandle targetTableHandle;

try {
// For redirected tables, column listing requires special handling, because the column metadata is unavailable
// at the source table, and needs to be fetched from the target table.
RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalTableName);

// The target table name should be non-empty. If it is empty, it means that there is an
// inconsistency in the connector's implementation of ConnectorMetadata#streamTableColumns and
// ConnectorMetadata#redirectTable.
if (redirection.redirectedTableName().isEmpty()) {
return;
// Process tables without redirect
Map<SchemaTableName, Set<String>> columnNamesByTable = catalogColumns.stream()
.filter(tableColumnsMetadata -> tableColumnsMetadata.getColumns().isPresent())
.collect(toImmutableMap(
TableColumnsMetadata::getTable,
tableColumnsMetadata -> tableColumnsMetadata.getColumns().orElseThrow().stream()
.map(ColumnMetadata::getName)
.collect(toImmutableSet())));
Map<SchemaTableName, Set<String>> catalogAllowedColumns = accessControl.filterColumns(session.toSecurityContext(), prefix.getCatalogName(), columnNamesByTable);
catalogColumns.stream()
.filter(tableColumnsMetadata -> tableColumnsMetadata.getColumns().isPresent())
.forEach(tableColumnsMetadata -> {
Set<String> allowedTableColumns = catalogAllowedColumns.getOrDefault(tableColumnsMetadata.getTable(), ImmutableSet.of());
result.put(
tableColumnsMetadata.getTable(),
tableColumnsMetadata.getColumns().get().stream()
.filter(column -> allowedTableColumns.contains(column.getName()))
.collect(toImmutableList()));
});

// Process redirects
catalogColumns.stream()
.filter(tableColumnsMetadata -> tableColumnsMetadata.getColumns().isEmpty())
.forEach(tableColumnsMetadata -> {
SchemaTableName table = tableColumnsMetadata.getTable();
QualifiedObjectName originalTableName = new QualifiedObjectName(prefix.getCatalogName(), table.getSchemaName(), table.getTableName());
QualifiedObjectName actualTableName;
TableHandle targetTableHandle;
try {
// For redirected tables, column listing requires special handling, because the column metadata is unavailable
// at the source table, and needs to be fetched from the target table.
RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalTableName);

// The target table name should be non-empty. If it is empty, it means that there is an
// inconsistency in the connector's implementation of ConnectorMetadata#streamTableColumns and
// ConnectorMetadata#redirectTable.
if (redirection.redirectedTableName().isEmpty()) {
return;
}
actualTableName = redirection.redirectedTableName().get();
targetTableHandle = redirection.tableHandle().orElseThrow();
}
actualTableName = redirection.redirectedTableName().get();
targetTableHandle = redirection.tableHandle().orElseThrow();
}
catch (TrinoException e) {
// Ignore redirection errors
if (e.getErrorCode().equals(TABLE_REDIRECTION_ERROR.toErrorCode())) {
return;
catch (TrinoException e) {
// Ignore redirection errors
if (e.getErrorCode().equals(TABLE_REDIRECTION_ERROR.toErrorCode())) {
return;
}
throw e;
}
throw e;
}

columns = metadata.getTableMetadata(session, targetTableHandle).getColumns();
}

Set<String> allowedColumns = accessControl.filterColumns(
session.toSecurityContext(),
// Use redirected table name for applying column filters, since the source does not know the column metadata
actualTableName.asCatalogSchemaTableName(),
columns.stream()
.map(ColumnMetadata::getName)
.collect(toImmutableSet()));
result.put(
table,
columns.stream()
.filter(column -> allowedColumns.contains(column.getName()))
.collect(toImmutableList()));
});
List<ColumnMetadata> columns = metadata.getTableMetadata(session, targetTableHandle).getColumns();

Set<String> allowedColumns = accessControl.filterColumns(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can all redirected tables be collected and batched in a single accessControl.filterColumns call?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they would need to be grouped by catalog, but yes, that's possible

note that there are also other deficiencies around redirected tables handling. this requires test coverage and is not the part i am focusing on

session.toSecurityContext(),
actualTableName.asCatalogSchemaTableName().getCatalogName(),
ImmutableMap.of(
// Use redirected table name for applying column filters, since the source does not know the column metadata
actualTableName.asSchemaTableName(),
columns.stream()
.map(ColumnMetadata::getName)
.collect(toImmutableSet())))
.getOrDefault(actualTableName.asSchemaTableName(), ImmutableSet.of());
result.put(
table,
columns.stream()
.filter(column -> allowedColumns.contains(column.getName()))
.collect(toImmutableList()));
});

return result.buildOrThrow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,17 @@ public interface AccessControl

/**
* Filter the list of columns to those visible to the identity.
*
* @deprecated Use {@link #filterColumns(SecurityContext, String, Map)}
*/
@Deprecated
Set<String> filterColumns(SecurityContext context, CatalogSchemaTableName tableName, Set<String> columns);

/**
* Filter lists of columns of multiple tables to those visible to the identity.
*/
Map<SchemaTableName, Set<String>> filterColumns(SecurityContext context, String catalogName, Map<SchemaTableName, Set<String>> tableColumns);

/**
* Check if identity is allowed to add columns to the specified table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
Expand Down Expand Up @@ -635,6 +636,34 @@ public Set<String> filterColumns(SecurityContext securityContext, CatalogSchemaT
return columns;
}

@Override
public Map<SchemaTableName, Set<String>> filterColumns(SecurityContext securityContext, String catalogName, Map<SchemaTableName, Set<String>> tableColumns)
{
requireNonNull(securityContext, "securityContext is null");
requireNonNull(catalogName, "catalogName is null");
requireNonNull(tableColumns, "tableColumns is null");

Set<SchemaTableName> filteredTables = filterTables(securityContext, catalogName, tableColumns.keySet());
if (!filteredTables.equals(tableColumns.keySet())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if seems redundant as equals uses contains as well so in some cases it will execute contains twice for some elements

Copy link
Member Author

@findepi findepi Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would expect equals to invoke contains only if sets are equal (size chcked first?), and there is no second call then for given element

tableColumns = Maps.filterKeys(tableColumns, filteredTables::contains);
}

if (tableColumns.isEmpty()) {
// Do not call plugin-provided implementation unnecessarily.
return ImmutableMap.of();
}

for (SystemAccessControl systemAccessControl : getSystemAccessControls()) {
tableColumns = systemAccessControl.filterColumns(securityContext.toSystemSecurityContext(), catalogName, tableColumns);
}

ConnectorAccessControl connectorAccessControl = getConnectorAccessControl(securityContext.getTransactionId(), catalogName);
if (connectorAccessControl != null) {
tableColumns = connectorAccessControl.filterColumns(toConnectorSecurityContext(catalogName, securityContext), tableColumns);
}
return tableColumns;
}

@Override
public void checkCanAddColumns(SecurityContext securityContext, QualifiedObjectName tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ public Set<String> filterColumns(SecurityContext context, CatalogSchemaTableName
return columns;
}

@Override
public Map<SchemaTableName, Set<String>> filterColumns(SecurityContext context, String catalogName, Map<SchemaTableName, Set<String>> tableColumns)
{
return tableColumns;
}

@Override
public void checkCanAddColumns(SecurityContext context, QualifiedObjectName tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.security;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.metadata.QualifiedObjectName;
import io.trino.spi.connector.CatalogSchemaName;
Expand Down Expand Up @@ -267,6 +268,12 @@ public Set<String> filterColumns(SecurityContext context, CatalogSchemaTableName
return ImmutableSet.of();
}

@Override
public Map<SchemaTableName, Set<String>> filterColumns(SecurityContext context, String catalogName, Map<SchemaTableName, Set<String>> tableColumns)
{
return ImmutableMap.of();
}

@Override
public void checkCanShowSchemas(SecurityContext context, String catalogName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ public Set<String> filterColumns(SecurityContext context, CatalogSchemaTableName
return delegate().filterColumns(context, tableName, columns);
}

@Override
public Map<SchemaTableName, Set<String>> filterColumns(SecurityContext context, String catalogName, Map<SchemaTableName, Set<String>> tableColumns)
{
return delegate().filterColumns(context, catalogName, tableColumns);
}

@Override
public void checkCanAddColumns(SecurityContext context, QualifiedObjectName tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ public Set<String> filterColumns(ConnectorSecurityContext context, SchemaTableNa
return accessControl.filterColumns(securityContext, new CatalogSchemaTableName(catalogName, tableName), columns);
}

@Override
public Map<SchemaTableName, Set<String>> filterColumns(ConnectorSecurityContext context, Map<SchemaTableName, Set<String>> tableColumns)
{
checkArgument(context == null, "context must be null");
return accessControl.filterColumns(securityContext, catalogName, tableColumns);
}

@Override
public void checkCanAddColumn(ConnectorSecurityContext context, SchemaTableName tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

import io.trino.metadata.QualifiedObjectName;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.function.FunctionKind;
import io.trino.spi.security.AccessDeniedException;
import io.trino.spi.security.Identity;
import io.trino.spi.security.ViewExpression;
import io.trino.spi.type.Type;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -62,6 +64,12 @@ public Set<String> filterColumns(SecurityContext context, CatalogSchemaTableName
return delegate.filterColumns(context, tableName, columns);
}

@Override
public Map<SchemaTableName, Set<String>> filterColumns(SecurityContext context, String catalogName, Map<SchemaTableName, Set<String>> tableColumns)
{
return delegate.filterColumns(context, catalogName, tableColumns);
}

@Override
public void checkCanCreateViewWithSelectFromColumns(SecurityContext context, QualifiedObjectName tableName, Set<String> columnNames)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ public Set<String> filterColumns(SecurityContext context, CatalogSchemaTableName
return columns;
}

@Override
public Map<SchemaTableName, Set<String>> filterColumns(SecurityContext context, String catalogName, Map<SchemaTableName, Set<String>> tableColumns)
{
return tableColumns;
}

@Override
public void checkCanAddColumns(SecurityContext context, QualifiedObjectName tableName) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.function.Predicate;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.spi.security.AccessDeniedException.denyAddColumn;
import static io.trino.spi.security.AccessDeniedException.denyAlterColumn;
Expand Down Expand Up @@ -665,14 +666,30 @@ public void checkCanShowColumns(SecurityContext context, CatalogSchemaTableName

@Override
public Set<String> filterColumns(SecurityContext context, CatalogSchemaTableName table, Set<String> columns)
{
Set<String> visibleColumns = localFilterColumns(context, table.getSchemaTableName(), columns);
return super.filterColumns(context, table, visibleColumns);
}

@Override
public Map<SchemaTableName, Set<String>> filterColumns(SecurityContext context, String catalogName, Map<SchemaTableName, Set<String>> tableColumns)
{
tableColumns = tableColumns.entrySet().stream()
.collect(toImmutableMap(
Map.Entry::getKey,
e -> localFilterColumns(context, e.getKey(), e.getValue())));
return super.filterColumns(context, catalogName, tableColumns);
}

private Set<String> localFilterColumns(SecurityContext context, SchemaTableName table, Set<String> columns)
{
ImmutableSet.Builder<String> visibleColumns = ImmutableSet.builder();
for (String column : columns) {
if (!shouldDenyPrivilege(context.getIdentity().getUser(), table.getSchemaTableName().getTableName() + "." + column, SELECT_COLUMN)) {
if (!shouldDenyPrivilege(context.getIdentity().getUser(), table.getTableName() + "." + column, SELECT_COLUMN)) {
visibleColumns.add(column);
}
}
return super.filterColumns(context, table, visibleColumns.build());
return visibleColumns.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,15 @@ public Set<String> filterColumns(SecurityContext context, CatalogSchemaTableName
}
}

@Override
public Map<SchemaTableName, Set<String>> filterColumns(SecurityContext context, String catalogName, Map<SchemaTableName, Set<String>> tableColumns)
{
Span span = startSpan("filterColumns bulk");
try (var ignored = scopedSpan(span)) {
return delegate.filterColumns(context, catalogName, tableColumns);
}
}

@Override
public void checkCanAddColumns(SecurityContext context, QualifiedObjectName tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static io.trino.spi.security.AccessDeniedException.denyAddColumn;
import static io.trino.spi.security.AccessDeniedException.denyAlterColumn;
Expand Down Expand Up @@ -278,12 +279,26 @@ default void checkCanShowColumns(ConnectorSecurityContext context, SchemaTableNa

/**
* Filter the list of columns to those visible to the identity.
*
* @deprecated Use {@link #filterColumns(ConnectorSecurityContext, Map)}
*/
@Deprecated
default Set<String> filterColumns(ConnectorSecurityContext context, SchemaTableName tableName, Set<String> columns)
{
return emptySet();
}

/**
* Filter lists of columns of multiple tables to those visible to the identity.
*/
default Map<SchemaTableName, Set<String>> filterColumns(ConnectorSecurityContext context, Map<SchemaTableName, Set<String>> tableColumns)
{
return tableColumns.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> filterColumns(context, entry.getKey(), entry.getValue())));
}

/**
* Check if identity is allowed to add columns to the specified table.
*
Expand Down
Loading
Loading