diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java new file mode 100644 index 0000000000000..c31b5c931900d --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java @@ -0,0 +1,727 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; +import io.trino.plugin.jdbc.PredicatePushdownController.DomainPushdownResult; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.AggregateFunction; +import io.trino.spi.connector.AggregationApplicationResult; +import io.trino.spi.connector.Assignment; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorInsertTableHandle; +import io.trino.spi.connector.ConnectorNewTableLayout; +import io.trino.spi.connector.ConnectorOutputMetadata; +import io.trino.spi.connector.ConnectorOutputTableHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTableProperties; +import io.trino.spi.connector.ConnectorTableSchema; +import io.trino.spi.connector.Constraint; +import io.trino.spi.connector.ConstraintApplicationResult; +import io.trino.spi.connector.JoinApplicationResult; +import io.trino.spi.connector.JoinCondition; +import io.trino.spi.connector.JoinStatistics; +import io.trino.spi.connector.JoinType; +import io.trino.spi.connector.LimitApplicationResult; +import io.trino.spi.connector.ProjectionApplicationResult; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SchemaTablePrefix; +import io.trino.spi.connector.SortItem; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.connector.TableScanRedirectApplicationResult; +import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.expression.Variable; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.AccessDeniedException; +import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.statistics.ComputedStatistics; +import io.trino.spi.statistics.TableStatistics; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Functions.identity; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isAggregationPushdownEnabled; +import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isJoinPushdownEnabled; +import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isTopNPushdownEnabled; +import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; +import static java.lang.Math.max; +import static java.util.Objects.requireNonNull; + +public class DefaultJdbcMetadata + implements JdbcMetadata +{ + private static final String SYNTHETIC_COLUMN_NAME_PREFIX = "_pfgnrtd_"; + + private final JdbcClient jdbcClient; + private final boolean allowDropTable; + + private final AtomicReference rollbackAction = new AtomicReference<>(); + + public DefaultJdbcMetadata(JdbcClient jdbcClient, boolean allowDropTable) + { + this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); + this.allowDropTable = allowDropTable; + } + + @Override + public boolean schemaExists(ConnectorSession session, String schemaName) + { + return jdbcClient.schemaExists(session, schemaName); + } + + @Override + public List listSchemaNames(ConnectorSession session) + { + return ImmutableList.copyOf(jdbcClient.getSchemaNames(session)); + } + + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + { + return jdbcClient.getTableHandle(session, tableName) + .orElse(null); + } + + @Override + public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) + { + return jdbcClient.getSystemTable(session, tableName); + } + + @Override + public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle table, Constraint constraint) + { + JdbcTableHandle handle = (JdbcTableHandle) table; + if (handle.getSortOrder().isPresent() && handle.getLimit().isPresent()) { + handle = flushAttributesAsQuery(session, handle); + } + + TupleDomain oldDomain = handle.getConstraint(); + TupleDomain newDomain = oldDomain.intersect(constraint.getSummary()); + + TupleDomain remainingFilter; + if (newDomain.isNone()) { + remainingFilter = TupleDomain.all(); + } + else { + Map domains = newDomain.getDomains().orElseThrow(); + List columnHandles = domains.keySet().stream() + .map(JdbcColumnHandle.class::cast) + .collect(toImmutableList()); + List columnMappings = jdbcClient.toColumnMappings( + session, + columnHandles.stream() + .map(JdbcColumnHandle::getJdbcTypeHandle) + .collect(toImmutableList())); + + Map supported = new HashMap<>(); + Map unsupported = new HashMap<>(); + for (int i = 0; i < columnHandles.size(); i++) { + JdbcColumnHandle column = columnHandles.get(i); + ColumnMapping mapping = columnMappings.get(i); + DomainPushdownResult pushdownResult = mapping.getPredicatePushdownController().apply(session, domains.get(column)); + supported.put(column, pushdownResult.getPushedDown()); + unsupported.put(column, pushdownResult.getRemainingFilter()); + } + + newDomain = TupleDomain.withColumnDomains(supported); + remainingFilter = TupleDomain.withColumnDomains(unsupported); + } + + if (oldDomain.equals(newDomain)) { + return Optional.empty(); + } + + handle = new JdbcTableHandle( + handle.getRelationHandle(), + newDomain, + handle.getSortOrder(), + handle.getLimit(), + handle.getColumns(), + handle.getOtherReferencedTables(), + handle.getNextSyntheticColumnId()); + + return Optional.of(new ConstraintApplicationResult<>(handle, remainingFilter)); + } + + private JdbcTableHandle flushAttributesAsQuery(ConnectorSession session, JdbcTableHandle handle) + { + List columns = jdbcClient.getColumns(session, handle); + PreparedQuery preparedQuery = jdbcClient.prepareQuery(session, handle, Optional.empty(), columns, ImmutableMap.of()); + + return new JdbcTableHandle( + new JdbcQueryRelationHandle(preparedQuery), + TupleDomain.all(), + Optional.empty(), + OptionalLong.empty(), + Optional.of(columns), + handle.getAllReferencedTables(), + handle.getNextSyntheticColumnId()); + } + + @Override + public Optional> applyProjection( + ConnectorSession session, + ConnectorTableHandle table, + List projections, + Map assignments) + { + JdbcTableHandle handle = (JdbcTableHandle) table; + + List newColumns = assignments.values().stream() + .map(JdbcColumnHandle.class::cast) + .collect(toImmutableList()); + + if (handle.getColumns().isPresent() && containSameElements(newColumns, handle.getColumns().get())) { + return Optional.empty(); + } + + return Optional.of(new ProjectionApplicationResult<>( + new JdbcTableHandle( + handle.getRelationHandle(), + handle.getConstraint(), + handle.getSortOrder(), + handle.getLimit(), + Optional.of(newColumns), + handle.getOtherReferencedTables(), + handle.getNextSyntheticColumnId()), + projections, + assignments.entrySet().stream() + .map(assignment -> new Assignment( + assignment.getKey(), + assignment.getValue(), + ((JdbcColumnHandle) assignment.getValue()).getColumnType())) + .collect(toImmutableList()))); + } + + @Override + public Optional> applyAggregation( + ConnectorSession session, + ConnectorTableHandle table, + List aggregates, + Map assignments, + List> groupingSets) + { + if (!isAggregationPushdownEnabled(session)) { + return Optional.empty(); + } + + JdbcTableHandle handle = (JdbcTableHandle) table; + + // Global aggregation is represented by [[]] + verify(!groupingSets.isEmpty(), "No grouping sets provided"); + + if (!jdbcClient.supportsAggregationPushdown(session, handle, groupingSets)) { + // JDBC client implementation prevents pushdown for the given table + return Optional.empty(); + } + + if (handle.getLimit().isPresent()) { + handle = flushAttributesAsQuery(session, handle); + } + + int nextSyntheticColumnId = handle.getNextSyntheticColumnId(); + + ImmutableList.Builder newColumns = ImmutableList.builder(); + ImmutableList.Builder projections = ImmutableList.builder(); + ImmutableList.Builder resultAssignments = ImmutableList.builder(); + ImmutableMap.Builder expressions = ImmutableMap.builder(); + for (AggregateFunction aggregate : aggregates) { + Optional expression = jdbcClient.implementAggregation(session, aggregate, assignments); + if (expression.isEmpty()) { + return Optional.empty(); + } + + String columnName = SYNTHETIC_COLUMN_NAME_PREFIX + nextSyntheticColumnId; + nextSyntheticColumnId++; + JdbcColumnHandle newColumn = JdbcColumnHandle.builder() + .setColumnName(columnName) + .setJdbcTypeHandle(expression.get().getJdbcTypeHandle()) + .setColumnType(aggregate.getOutputType()) + .setComment(Optional.of("synthetic")) + .build(); + + newColumns.add(newColumn); + projections.add(new Variable(newColumn.getColumnName(), aggregate.getOutputType())); + resultAssignments.add(new Assignment(newColumn.getColumnName(), newColumn, aggregate.getOutputType())); + expressions.put(columnName, expression.get().getExpression()); + } + + List> groupingSetsAsJdbcColumnHandles = groupingSets.stream() + .map(groupingSet -> groupingSet.stream() + .map(JdbcColumnHandle.class::cast) + .collect(toImmutableList())) + .collect(toImmutableList()); + + List newColumnsList = newColumns.build(); + + PreparedQuery preparedQuery = jdbcClient.prepareQuery( + session, + handle, + Optional.of(groupingSetsAsJdbcColumnHandles), + ImmutableList.builder() + .addAll(groupingSetsAsJdbcColumnHandles.stream() + .flatMap(List::stream) + .distinct() + .iterator()) + .addAll(newColumnsList) + .build(), + expressions.build()); + handle = new JdbcTableHandle( + new JdbcQueryRelationHandle(preparedQuery), + TupleDomain.all(), + Optional.empty(), + OptionalLong.empty(), + Optional.of(newColumnsList), + handle.getAllReferencedTables(), + nextSyntheticColumnId); + + return Optional.of(new AggregationApplicationResult<>(handle, projections.build(), resultAssignments.build(), ImmutableMap.of())); + } + + @Override + public Optional> applyJoin( + ConnectorSession session, + JoinType joinType, + ConnectorTableHandle left, + ConnectorTableHandle right, + List joinConditions, + Map leftAssignments, + Map rightAssignments, + JoinStatistics statistics) + { + if (!isJoinPushdownEnabled(session)) { + return Optional.empty(); + } + + JdbcTableHandle leftHandle = flushAttributesAsQuery(session, (JdbcTableHandle) left); + JdbcTableHandle rightHandle = flushAttributesAsQuery(session, (JdbcTableHandle) right); + int nextSyntheticColumnId = max(leftHandle.getNextSyntheticColumnId(), rightHandle.getNextSyntheticColumnId()); + + ImmutableMap.Builder newLeftColumnsBuilder = ImmutableMap.builder(); + for (JdbcColumnHandle column : jdbcClient.getColumns(session, leftHandle)) { + newLeftColumnsBuilder.put(column, JdbcColumnHandle.builderFrom(column) + .setColumnName(column.getColumnName() + "_" + nextSyntheticColumnId) + .build()); + nextSyntheticColumnId++; + } + Map newLeftColumns = newLeftColumnsBuilder.build(); + + ImmutableMap.Builder newRightColumnsBuilder = ImmutableMap.builder(); + for (JdbcColumnHandle column : jdbcClient.getColumns(session, rightHandle)) { + newRightColumnsBuilder.put(column, JdbcColumnHandle.builderFrom(column) + .setColumnName(column.getColumnName() + "_" + nextSyntheticColumnId) + .build()); + nextSyntheticColumnId++; + } + Map newRightColumns = newRightColumnsBuilder.build(); + + ImmutableList.Builder jdbcJoinConditions = ImmutableList.builder(); + for (JoinCondition joinCondition : joinConditions) { + Optional leftColumn = getVariableColumnHandle(leftAssignments, joinCondition.getLeftExpression()); + Optional rightColumn = getVariableColumnHandle(rightAssignments, joinCondition.getRightExpression()); + if (leftColumn.isEmpty() || rightColumn.isEmpty()) { + return Optional.empty(); + } + jdbcJoinConditions.add(new JdbcJoinCondition(leftColumn.get(), joinCondition.getOperator(), rightColumn.get())); + } + + Optional joinQuery = jdbcClient.implementJoin( + session, + joinType, + asPreparedQuery(leftHandle), + asPreparedQuery(rightHandle), + jdbcJoinConditions.build(), + newRightColumns.entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().getColumnName())), + newLeftColumns.entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().getColumnName())), + statistics); + + if (joinQuery.isEmpty()) { + return Optional.empty(); + } + + return Optional.of(new JoinApplicationResult<>( + new JdbcTableHandle( + new JdbcQueryRelationHandle(joinQuery.get()), + TupleDomain.all(), + Optional.empty(), + OptionalLong.empty(), + Optional.of( + ImmutableList.builder() + .addAll(newLeftColumns.values()) + .addAll(newRightColumns.values()) + .build()), + ImmutableSet.builder() + .addAll(leftHandle.getAllReferencedTables()) + .addAll(rightHandle.getAllReferencedTables()) + .build(), + nextSyntheticColumnId), + ImmutableMap.copyOf(newLeftColumns), + ImmutableMap.copyOf(newRightColumns))); + } + + private static Optional getVariableColumnHandle(Map assignments, ConnectorExpression expression) + { + requireNonNull(assignments, "assignments is null"); + requireNonNull(expression, "expression is null"); + if (!(expression instanceof Variable)) { + return Optional.empty(); + } + + String name = ((Variable) expression).getName(); + ColumnHandle columnHandle = assignments.get(name); + verifyNotNull(columnHandle, "No assignment for %s", name); + return Optional.of(((JdbcColumnHandle) columnHandle)); + } + + private static PreparedQuery asPreparedQuery(JdbcTableHandle tableHandle) + { + checkArgument( + tableHandle.getConstraint().equals(TupleDomain.all()) && + tableHandle.getLimit().isEmpty() && + tableHandle.getRelationHandle() instanceof JdbcQueryRelationHandle, + "Handle is not a plain query: %s", + tableHandle); + return ((JdbcQueryRelationHandle) tableHandle.getRelationHandle()).getPreparedQuery(); + } + + @Override + public Optional> applyLimit(ConnectorSession session, ConnectorTableHandle table, long limit) + { + JdbcTableHandle handle = (JdbcTableHandle) table; + + if (limit > Integer.MAX_VALUE) { + // Some databases, e.g. Phoenix, Redshift, do not support limit exceeding 2147483647. + return Optional.empty(); + } + + if (!jdbcClient.supportsLimit()) { + return Optional.empty(); + } + + if (handle.getLimit().isPresent() && handle.getLimit().getAsLong() <= limit) { + return Optional.empty(); + } + + handle = new JdbcTableHandle( + handle.getRelationHandle(), + handle.getConstraint(), + handle.getSortOrder(), + OptionalLong.of(limit), + handle.getColumns(), + handle.getOtherReferencedTables(), + handle.getNextSyntheticColumnId()); + + return Optional.of(new LimitApplicationResult<>(handle, jdbcClient.isLimitGuaranteed(session))); + } + + @Override + public Optional> applyTopN( + ConnectorSession session, + ConnectorTableHandle table, + long topNCount, + List sortItems, + Map assignments) + { + if (!isTopNPushdownEnabled(session)) { + return Optional.empty(); + } + + verify(!sortItems.isEmpty(), "sortItems are empty"); + JdbcTableHandle handle = (JdbcTableHandle) table; + + List resultSortOrder = sortItems.stream() + .map(sortItem -> { + verify(assignments.containsKey(sortItem.getName()), "assignments does not contain sortItem: %s", sortItem.getName()); + return new JdbcSortItem(((JdbcColumnHandle) assignments.get(sortItem.getName())), sortItem.getSortOrder()); + }) + .collect(toImmutableList()); + + if (!jdbcClient.supportsTopN(session, handle, resultSortOrder)) { + // JDBC client implementation prevents TopN pushdown for the given table and sort items + // e.g. when order by on a given type does not match Trino semantics + return Optional.empty(); + } + + if (handle.getSortOrder().isPresent() || handle.getLimit().isPresent()) { + if (handle.getLimit().equals(OptionalLong.of(topNCount)) && handle.getSortOrder().equals(Optional.of(resultSortOrder))) { + return Optional.empty(); + } + + handle = flushAttributesAsQuery(session, handle); + } + + JdbcTableHandle sortedTableHandle = new JdbcTableHandle( + handle.getRelationHandle(), + handle.getConstraint(), + Optional.of(resultSortOrder), + OptionalLong.of(topNCount), + handle.getColumns(), + handle.getOtherReferencedTables(), + handle.getNextSyntheticColumnId()); + + return Optional.of(new TopNApplicationResult<>(sortedTableHandle, jdbcClient.isTopNLimitGuaranteed(session))); + } + + @Override + public Optional applyTableScanRedirect(ConnectorSession session, ConnectorTableHandle table) + { + JdbcTableHandle tableHandle = (JdbcTableHandle) table; + return jdbcClient.getTableScanRedirection(session, tableHandle); + } + + @Override + public boolean usesLegacyTableLayouts() + { + return false; + } + + @Override + public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) + { + return new ConnectorTableProperties(); + } + + @Override + public ConnectorTableSchema getTableSchema(ConnectorSession session, ConnectorTableHandle table) + { + JdbcTableHandle handle = (JdbcTableHandle) table; + + return new ConnectorTableSchema( + getSchemaTableName(handle), + jdbcClient.getColumns(session, handle).stream() + .map(JdbcColumnHandle::getColumnSchema) + .collect(toImmutableList())); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) + { + JdbcTableHandle handle = (JdbcTableHandle) table; + + return new ConnectorTableMetadata( + getSchemaTableName(handle), + jdbcClient.getColumns(session, handle).stream() + .map(JdbcColumnHandle::getColumnMetadata) + .collect(toImmutableList()), + jdbcClient.getTableProperties(session, handle)); + } + + public static SchemaTableName getSchemaTableName(JdbcTableHandle handle) + { + return handle.isNamedRelation() + ? handle.getRequiredNamedRelation().getSchemaTableName() + // TODO (https://github.com/trinodb/trino/issues/6694) SchemaTableName should not be required for synthetic ConnectorTableHandle + : new SchemaTableName("_generated", "_generated_query"); + } + + @Override + public List listTables(ConnectorSession session, Optional schemaName) + { + return jdbcClient.getTableNames(session, schemaName); + } + + @Override + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return jdbcClient.getColumns(session, (JdbcTableHandle) tableHandle).stream() + .collect(toImmutableMap(columnHandle -> columnHandle.getColumnMetadata().getName(), identity())); + } + + @Override + public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + { + ImmutableMap.Builder> columns = ImmutableMap.builder(); + List tables = prefix.toOptionalSchemaTableName() + .>map(ImmutableList::of) + .orElseGet(() -> listTables(session, prefix.getSchema())); + for (SchemaTableName tableName : tables) { + try { + jdbcClient.getTableHandle(session, tableName) + .ifPresent(tableHandle -> columns.put(tableName, getTableMetadata(session, tableHandle).getColumns())); + } + catch (TableNotFoundException | AccessDeniedException e) { + // table disappeared during listing operation or user is not allowed to access it + // these exceptions are ignored because listTableColumns is used for metadata queries (SELECT FROM information_schema) + } + } + return columns.build(); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + return ((JdbcColumnHandle) columnHandle).getColumnMetadata(); + } + + @Override + public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) + { + if (!allowDropTable) { + throw new TrinoException(PERMISSION_DENIED, "DROP TABLE is disabled in this catalog"); + } + JdbcTableHandle handle = (JdbcTableHandle) tableHandle; + verify(!handle.isSynthetic(), "Not a table reference: %s", handle); + jdbcClient.dropTable(session, handle); + } + + @Override + public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) + { + JdbcOutputTableHandle handle = jdbcClient.beginCreateTable(session, tableMetadata); + setRollback(() -> jdbcClient.rollbackCreateTable(session, handle)); + return handle; + } + + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) + { + jdbcClient.createTable(session, tableMetadata); + } + + @Override + public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) + { + JdbcOutputTableHandle handle = (JdbcOutputTableHandle) tableHandle; + jdbcClient.commitCreateTable(session, handle); + return Optional.empty(); + } + + private void setRollback(Runnable action) + { + checkState(rollbackAction.compareAndSet(null, action), "rollback action is already set"); + } + + @Override + public void rollback() + { + Optional.ofNullable(rollbackAction.getAndSet(null)).ifPresent(Runnable::run); + } + + @Override + public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns) + { + verify(!((JdbcTableHandle) tableHandle).isSynthetic(), "Not a table reference: %s", tableHandle); + List columnHandles = columns.stream() + .map(JdbcColumnHandle.class::cast) + .collect(toImmutableList()); + JdbcOutputTableHandle handle = jdbcClient.beginInsertTable(session, (JdbcTableHandle) tableHandle, columnHandles); + setRollback(() -> jdbcClient.rollbackCreateTable(session, handle)); + return handle; + } + + @Override + public boolean supportsMissingColumnsOnInsert() + { + return true; + } + + @Override + public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection fragments, Collection computedStatistics) + { + JdbcOutputTableHandle jdbcInsertHandle = (JdbcOutputTableHandle) tableHandle; + jdbcClient.finishInsertTable(session, jdbcInsertHandle); + return Optional.empty(); + } + + @Override + public void setColumnComment(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column, Optional comment) + { + JdbcTableHandle tableHandle = (JdbcTableHandle) table; + JdbcColumnHandle columnHandle = (JdbcColumnHandle) column; + verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); + jdbcClient.setColumnComment(session, tableHandle, columnHandle, comment); + } + + @Override + public void addColumn(ConnectorSession session, ConnectorTableHandle table, ColumnMetadata columnMetadata) + { + JdbcTableHandle tableHandle = (JdbcTableHandle) table; + verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); + jdbcClient.addColumn(session, tableHandle, columnMetadata); + } + + @Override + public void dropColumn(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column) + { + JdbcTableHandle tableHandle = (JdbcTableHandle) table; + JdbcColumnHandle columnHandle = (JdbcColumnHandle) column; + verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); + jdbcClient.dropColumn(session, tableHandle, columnHandle); + } + + @Override + public void renameColumn(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column, String target) + { + JdbcTableHandle tableHandle = (JdbcTableHandle) table; + JdbcColumnHandle columnHandle = (JdbcColumnHandle) column; + verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); + jdbcClient.renameColumn(session, tableHandle, columnHandle, target); + } + + @Override + public void renameTable(ConnectorSession session, ConnectorTableHandle table, SchemaTableName newTableName) + { + JdbcTableHandle tableHandle = (JdbcTableHandle) table; + verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); + jdbcClient.renameTable(session, tableHandle, newTableName); + } + + @Override + public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint) + { + JdbcTableHandle handle = (JdbcTableHandle) tableHandle; + return jdbcClient.getTableStatistics(session, handle, constraint.getSummary()); + } + + @Override + public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner) + { + jdbcClient.createSchema(session, schemaName); + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName) + { + jdbcClient.dropSchema(session, schemaName); + } + + private static boolean containSameElements(Iterable first, Iterable second) + { + return ImmutableSet.copyOf(first).equals(ImmutableSet.copyOf(second)); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadataFactory.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadataFactory.java new file mode 100644 index 0000000000000..253af323c8d7a --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadataFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import io.airlift.units.Duration; + +import javax.inject.Inject; + +import java.util.Set; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.DAYS; + +public class DefaultJdbcMetadataFactory + implements JdbcMetadataFactory +{ + private final JdbcClient jdbcClient; + private final boolean allowDropTable; + + @Inject + public DefaultJdbcMetadataFactory(JdbcClient jdbcClient, JdbcMetadataConfig config) + { + this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); + requireNonNull(config, "config is null"); + this.allowDropTable = config.isAllowDropTable(); + } + + @Override + public JdbcMetadata create(JdbcTransactionHandle transaction) + { + // Session stays the same per transaction, therefore session properties don't need to + // be a part of cache keys in CachingJdbcClient. + return create(new CachingJdbcClient( + jdbcClient, + Set.of(), + new SingletonJdbcIdentityCacheMapping(), + new Duration(1, DAYS), true), + allowDropTable); + } + + protected JdbcMetadata create(JdbcClient transactionCachingJdbcClient, boolean allowDropTable) + { + return new DefaultJdbcMetadata(transactionCachingJdbcClient, allowDropTable); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcConnector.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcConnector.java index 638413d77ad8b..39dc4e4f2fe60 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcConnector.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcConnector.java @@ -97,7 +97,7 @@ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel { checkConnectorSupports(READ_COMMITTED, isolationLevel); JdbcTransactionHandle transaction = new JdbcTransactionHandle(); - transactions.put(transaction, jdbcMetadataFactory.create()); + transactions.put(transaction, jdbcMetadataFactory.create(transaction)); return transaction; } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadata.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadata.java index 0dfafec7e4281..360aa8ba1e295 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadata.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadata.java @@ -13,715 +13,10 @@ */ package io.trino.plugin.jdbc; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import io.airlift.slice.Slice; -import io.trino.plugin.jdbc.PredicatePushdownController.DomainPushdownResult; -import io.trino.spi.TrinoException; -import io.trino.spi.connector.AggregateFunction; -import io.trino.spi.connector.AggregationApplicationResult; -import io.trino.spi.connector.Assignment; -import io.trino.spi.connector.ColumnHandle; -import io.trino.spi.connector.ColumnMetadata; -import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMetadata; -import io.trino.spi.connector.ConnectorNewTableLayout; -import io.trino.spi.connector.ConnectorOutputMetadata; -import io.trino.spi.connector.ConnectorOutputTableHandle; -import io.trino.spi.connector.ConnectorSession; -import io.trino.spi.connector.ConnectorTableHandle; -import io.trino.spi.connector.ConnectorTableMetadata; -import io.trino.spi.connector.ConnectorTableProperties; -import io.trino.spi.connector.ConnectorTableSchema; -import io.trino.spi.connector.Constraint; -import io.trino.spi.connector.ConstraintApplicationResult; -import io.trino.spi.connector.JoinApplicationResult; -import io.trino.spi.connector.JoinCondition; -import io.trino.spi.connector.JoinStatistics; -import io.trino.spi.connector.JoinType; -import io.trino.spi.connector.LimitApplicationResult; -import io.trino.spi.connector.ProjectionApplicationResult; -import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.SchemaTablePrefix; -import io.trino.spi.connector.SortItem; -import io.trino.spi.connector.SystemTable; -import io.trino.spi.connector.TableNotFoundException; -import io.trino.spi.connector.TableScanRedirectApplicationResult; -import io.trino.spi.connector.TopNApplicationResult; -import io.trino.spi.expression.ConnectorExpression; -import io.trino.spi.expression.Variable; -import io.trino.spi.predicate.Domain; -import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.security.AccessDeniedException; -import io.trino.spi.security.TrinoPrincipal; -import io.trino.spi.statistics.ComputedStatistics; -import io.trino.spi.statistics.TableStatistics; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.OptionalLong; -import java.util.concurrent.atomic.AtomicReference; - -import static com.google.common.base.Functions.identity; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Verify.verify; -import static com.google.common.base.Verify.verifyNotNull; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isAggregationPushdownEnabled; -import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isJoinPushdownEnabled; -import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isTopNPushdownEnabled; -import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; -import static java.lang.Math.max; -import static java.util.Objects.requireNonNull; - -public class JdbcMetadata - implements ConnectorMetadata +public interface JdbcMetadata + extends ConnectorMetadata { - private static final String SYNTHETIC_COLUMN_NAME_PREFIX = "_pfgnrtd_"; - - private final JdbcClient jdbcClient; - private final boolean allowDropTable; - - private final AtomicReference rollbackAction = new AtomicReference<>(); - - public JdbcMetadata(JdbcClient jdbcClient, boolean allowDropTable) - { - this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); - this.allowDropTable = allowDropTable; - } - - @Override - public boolean schemaExists(ConnectorSession session, String schemaName) - { - return jdbcClient.schemaExists(session, schemaName); - } - - @Override - public List listSchemaNames(ConnectorSession session) - { - return ImmutableList.copyOf(jdbcClient.getSchemaNames(session)); - } - - @Override - public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) - { - return jdbcClient.getTableHandle(session, tableName) - .orElse(null); - } - - @Override - public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) - { - return jdbcClient.getSystemTable(session, tableName); - } - - @Override - public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle table, Constraint constraint) - { - JdbcTableHandle handle = (JdbcTableHandle) table; - if (handle.getSortOrder().isPresent() && handle.getLimit().isPresent()) { - handle = flushAttributesAsQuery(session, handle); - } - - TupleDomain oldDomain = handle.getConstraint(); - TupleDomain newDomain = oldDomain.intersect(constraint.getSummary()); - - TupleDomain remainingFilter; - if (newDomain.isNone()) { - remainingFilter = TupleDomain.all(); - } - else { - Map domains = newDomain.getDomains().orElseThrow(); - List columnHandles = domains.keySet().stream() - .map(JdbcColumnHandle.class::cast) - .collect(toImmutableList()); - List columnMappings = jdbcClient.toColumnMappings( - session, - columnHandles.stream() - .map(JdbcColumnHandle::getJdbcTypeHandle) - .collect(toImmutableList())); - - Map supported = new HashMap<>(); - Map unsupported = new HashMap<>(); - for (int i = 0; i < columnHandles.size(); i++) { - JdbcColumnHandle column = columnHandles.get(i); - ColumnMapping mapping = columnMappings.get(i); - DomainPushdownResult pushdownResult = mapping.getPredicatePushdownController().apply(session, domains.get(column)); - supported.put(column, pushdownResult.getPushedDown()); - unsupported.put(column, pushdownResult.getRemainingFilter()); - } - - newDomain = TupleDomain.withColumnDomains(supported); - remainingFilter = TupleDomain.withColumnDomains(unsupported); - } - - if (oldDomain.equals(newDomain)) { - return Optional.empty(); - } - - handle = new JdbcTableHandle( - handle.getRelationHandle(), - newDomain, - handle.getSortOrder(), - handle.getLimit(), - handle.getColumns(), - handle.getOtherReferencedTables(), - handle.getNextSyntheticColumnId()); - - return Optional.of(new ConstraintApplicationResult<>(handle, remainingFilter)); - } - - private JdbcTableHandle flushAttributesAsQuery(ConnectorSession session, JdbcTableHandle handle) - { - List columns = jdbcClient.getColumns(session, handle); - PreparedQuery preparedQuery = jdbcClient.prepareQuery(session, handle, Optional.empty(), columns, ImmutableMap.of()); - - return new JdbcTableHandle( - new JdbcQueryRelationHandle(preparedQuery), - TupleDomain.all(), - Optional.empty(), - OptionalLong.empty(), - Optional.of(columns), - handle.getAllReferencedTables(), - handle.getNextSyntheticColumnId()); - } - - @Override - public Optional> applyProjection( - ConnectorSession session, - ConnectorTableHandle table, - List projections, - Map assignments) - { - JdbcTableHandle handle = (JdbcTableHandle) table; - - List newColumns = assignments.values().stream() - .map(JdbcColumnHandle.class::cast) - .collect(toImmutableList()); - - if (handle.getColumns().isPresent() && containSameElements(newColumns, handle.getColumns().get())) { - return Optional.empty(); - } - - return Optional.of(new ProjectionApplicationResult<>( - new JdbcTableHandle( - handle.getRelationHandle(), - handle.getConstraint(), - handle.getSortOrder(), - handle.getLimit(), - Optional.of(newColumns), - handle.getOtherReferencedTables(), - handle.getNextSyntheticColumnId()), - projections, - assignments.entrySet().stream() - .map(assignment -> new Assignment( - assignment.getKey(), - assignment.getValue(), - ((JdbcColumnHandle) assignment.getValue()).getColumnType())) - .collect(toImmutableList()))); - } - - @Override - public Optional> applyAggregation( - ConnectorSession session, - ConnectorTableHandle table, - List aggregates, - Map assignments, - List> groupingSets) - { - if (!isAggregationPushdownEnabled(session)) { - return Optional.empty(); - } - - JdbcTableHandle handle = (JdbcTableHandle) table; - - // Global aggregation is represented by [[]] - verify(!groupingSets.isEmpty(), "No grouping sets provided"); - - if (!jdbcClient.supportsAggregationPushdown(session, handle, groupingSets)) { - // JDBC client implementation prevents pushdown for the given table - return Optional.empty(); - } - - if (handle.getLimit().isPresent()) { - handle = flushAttributesAsQuery(session, handle); - } - - int nextSyntheticColumnId = handle.getNextSyntheticColumnId(); - - ImmutableList.Builder newColumns = ImmutableList.builder(); - ImmutableList.Builder projections = ImmutableList.builder(); - ImmutableList.Builder resultAssignments = ImmutableList.builder(); - ImmutableMap.Builder expressions = ImmutableMap.builder(); - for (AggregateFunction aggregate : aggregates) { - Optional expression = jdbcClient.implementAggregation(session, aggregate, assignments); - if (expression.isEmpty()) { - return Optional.empty(); - } - - String columnName = SYNTHETIC_COLUMN_NAME_PREFIX + nextSyntheticColumnId; - nextSyntheticColumnId++; - JdbcColumnHandle newColumn = JdbcColumnHandle.builder() - .setColumnName(columnName) - .setJdbcTypeHandle(expression.get().getJdbcTypeHandle()) - .setColumnType(aggregate.getOutputType()) - .setComment(Optional.of("synthetic")) - .build(); - - newColumns.add(newColumn); - projections.add(new Variable(newColumn.getColumnName(), aggregate.getOutputType())); - resultAssignments.add(new Assignment(newColumn.getColumnName(), newColumn, aggregate.getOutputType())); - expressions.put(columnName, expression.get().getExpression()); - } - - List> groupingSetsAsJdbcColumnHandles = groupingSets.stream() - .map(groupingSet -> groupingSet.stream() - .map(JdbcColumnHandle.class::cast) - .collect(toImmutableList())) - .collect(toImmutableList()); - - List newColumnsList = newColumns.build(); - - PreparedQuery preparedQuery = jdbcClient.prepareQuery( - session, - handle, - Optional.of(groupingSetsAsJdbcColumnHandles), - ImmutableList.builder() - .addAll(groupingSetsAsJdbcColumnHandles.stream() - .flatMap(List::stream) - .distinct() - .iterator()) - .addAll(newColumnsList) - .build(), - expressions.build()); - handle = new JdbcTableHandle( - new JdbcQueryRelationHandle(preparedQuery), - TupleDomain.all(), - Optional.empty(), - OptionalLong.empty(), - Optional.of(newColumnsList), - handle.getAllReferencedTables(), - nextSyntheticColumnId); - - return Optional.of(new AggregationApplicationResult<>(handle, projections.build(), resultAssignments.build(), ImmutableMap.of())); - } - - @Override - public Optional> applyJoin( - ConnectorSession session, - JoinType joinType, - ConnectorTableHandle left, - ConnectorTableHandle right, - List joinConditions, - Map leftAssignments, - Map rightAssignments, - JoinStatistics statistics) - { - if (!isJoinPushdownEnabled(session)) { - return Optional.empty(); - } - - JdbcTableHandle leftHandle = flushAttributesAsQuery(session, (JdbcTableHandle) left); - JdbcTableHandle rightHandle = flushAttributesAsQuery(session, (JdbcTableHandle) right); - int nextSyntheticColumnId = max(leftHandle.getNextSyntheticColumnId(), rightHandle.getNextSyntheticColumnId()); - - ImmutableMap.Builder newLeftColumnsBuilder = ImmutableMap.builder(); - for (JdbcColumnHandle column : jdbcClient.getColumns(session, leftHandle)) { - newLeftColumnsBuilder.put(column, JdbcColumnHandle.builderFrom(column) - .setColumnName(column.getColumnName() + "_" + nextSyntheticColumnId) - .build()); - nextSyntheticColumnId++; - } - Map newLeftColumns = newLeftColumnsBuilder.build(); - - ImmutableMap.Builder newRightColumnsBuilder = ImmutableMap.builder(); - for (JdbcColumnHandle column : jdbcClient.getColumns(session, rightHandle)) { - newRightColumnsBuilder.put(column, JdbcColumnHandle.builderFrom(column) - .setColumnName(column.getColumnName() + "_" + nextSyntheticColumnId) - .build()); - nextSyntheticColumnId++; - } - Map newRightColumns = newRightColumnsBuilder.build(); - - ImmutableList.Builder jdbcJoinConditions = ImmutableList.builder(); - for (JoinCondition joinCondition : joinConditions) { - Optional leftColumn = getVariableColumnHandle(leftAssignments, joinCondition.getLeftExpression()); - Optional rightColumn = getVariableColumnHandle(rightAssignments, joinCondition.getRightExpression()); - if (leftColumn.isEmpty() || rightColumn.isEmpty()) { - return Optional.empty(); - } - jdbcJoinConditions.add(new JdbcJoinCondition(leftColumn.get(), joinCondition.getOperator(), rightColumn.get())); - } - - Optional joinQuery = jdbcClient.implementJoin( - session, - joinType, - asPreparedQuery(leftHandle), - asPreparedQuery(rightHandle), - jdbcJoinConditions.build(), - newRightColumns.entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().getColumnName())), - newLeftColumns.entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().getColumnName())), - statistics); - - if (joinQuery.isEmpty()) { - return Optional.empty(); - } - - return Optional.of(new JoinApplicationResult<>( - new JdbcTableHandle( - new JdbcQueryRelationHandle(joinQuery.get()), - TupleDomain.all(), - Optional.empty(), - OptionalLong.empty(), - Optional.of( - ImmutableList.builder() - .addAll(newLeftColumns.values()) - .addAll(newRightColumns.values()) - .build()), - ImmutableSet.builder() - .addAll(leftHandle.getAllReferencedTables()) - .addAll(rightHandle.getAllReferencedTables()) - .build(), - nextSyntheticColumnId), - ImmutableMap.copyOf(newLeftColumns), - ImmutableMap.copyOf(newRightColumns))); - } - - private static Optional getVariableColumnHandle(Map assignments, ConnectorExpression expression) - { - requireNonNull(assignments, "assignments is null"); - requireNonNull(expression, "expression is null"); - if (!(expression instanceof Variable)) { - return Optional.empty(); - } - - String name = ((Variable) expression).getName(); - ColumnHandle columnHandle = assignments.get(name); - verifyNotNull(columnHandle, "No assignment for %s", name); - return Optional.of(((JdbcColumnHandle) columnHandle)); - } - - private static PreparedQuery asPreparedQuery(JdbcTableHandle tableHandle) - { - checkArgument( - tableHandle.getConstraint().equals(TupleDomain.all()) && - tableHandle.getLimit().isEmpty() && - tableHandle.getRelationHandle() instanceof JdbcQueryRelationHandle, - "Handle is not a plain query: %s", - tableHandle); - return ((JdbcQueryRelationHandle) tableHandle.getRelationHandle()).getPreparedQuery(); - } - - @Override - public Optional> applyLimit(ConnectorSession session, ConnectorTableHandle table, long limit) - { - JdbcTableHandle handle = (JdbcTableHandle) table; - - if (limit > Integer.MAX_VALUE) { - // Some databases, e.g. Phoenix, Redshift, do not support limit exceeding 2147483647. - return Optional.empty(); - } - - if (!jdbcClient.supportsLimit()) { - return Optional.empty(); - } - - if (handle.getLimit().isPresent() && handle.getLimit().getAsLong() <= limit) { - return Optional.empty(); - } - - handle = new JdbcTableHandle( - handle.getRelationHandle(), - handle.getConstraint(), - handle.getSortOrder(), - OptionalLong.of(limit), - handle.getColumns(), - handle.getOtherReferencedTables(), - handle.getNextSyntheticColumnId()); - - return Optional.of(new LimitApplicationResult<>(handle, jdbcClient.isLimitGuaranteed(session))); - } - - @Override - public Optional> applyTopN( - ConnectorSession session, - ConnectorTableHandle table, - long topNCount, - List sortItems, - Map assignments) - { - if (!isTopNPushdownEnabled(session)) { - return Optional.empty(); - } - - verify(!sortItems.isEmpty(), "sortItems are empty"); - JdbcTableHandle handle = (JdbcTableHandle) table; - - List resultSortOrder = sortItems.stream() - .map(sortItem -> { - verify(assignments.containsKey(sortItem.getName()), "assignments does not contain sortItem: %s", sortItem.getName()); - return new JdbcSortItem(((JdbcColumnHandle) assignments.get(sortItem.getName())), sortItem.getSortOrder()); - }) - .collect(toImmutableList()); - - if (!jdbcClient.supportsTopN(session, handle, resultSortOrder)) { - // JDBC client implementation prevents TopN pushdown for the given table and sort items - // e.g. when order by on a given type does not match Trino semantics - return Optional.empty(); - } - - if (handle.getSortOrder().isPresent() || handle.getLimit().isPresent()) { - if (handle.getLimit().equals(OptionalLong.of(topNCount)) && handle.getSortOrder().equals(Optional.of(resultSortOrder))) { - return Optional.empty(); - } - - handle = flushAttributesAsQuery(session, handle); - } - - JdbcTableHandle sortedTableHandle = new JdbcTableHandle( - handle.getRelationHandle(), - handle.getConstraint(), - Optional.of(resultSortOrder), - OptionalLong.of(topNCount), - handle.getColumns(), - handle.getOtherReferencedTables(), - handle.getNextSyntheticColumnId()); - - return Optional.of(new TopNApplicationResult<>(sortedTableHandle, jdbcClient.isTopNLimitGuaranteed(session))); - } - - @Override - public Optional applyTableScanRedirect(ConnectorSession session, ConnectorTableHandle table) - { - JdbcTableHandle tableHandle = (JdbcTableHandle) table; - return jdbcClient.getTableScanRedirection(session, tableHandle); - } - - @Override - public boolean usesLegacyTableLayouts() - { - return false; - } - - @Override - public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) - { - return new ConnectorTableProperties(); - } - - @Override - public ConnectorTableSchema getTableSchema(ConnectorSession session, ConnectorTableHandle table) - { - JdbcTableHandle handle = (JdbcTableHandle) table; - - return new ConnectorTableSchema( - getSchemaTableName(handle), - jdbcClient.getColumns(session, handle).stream() - .map(JdbcColumnHandle::getColumnSchema) - .collect(toImmutableList())); - } - - @Override - public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) - { - JdbcTableHandle handle = (JdbcTableHandle) table; - - return new ConnectorTableMetadata( - getSchemaTableName(handle), - jdbcClient.getColumns(session, handle).stream() - .map(JdbcColumnHandle::getColumnMetadata) - .collect(toImmutableList()), - jdbcClient.getTableProperties(session, handle)); - } - - public static SchemaTableName getSchemaTableName(JdbcTableHandle handle) - { - return handle.isNamedRelation() - ? handle.getRequiredNamedRelation().getSchemaTableName() - // TODO (https://github.com/trinodb/trino/issues/6694) SchemaTableName should not be required for synthetic ConnectorTableHandle - : new SchemaTableName("_generated", "_generated_query"); - } - - @Override - public List listTables(ConnectorSession session, Optional schemaName) - { - return jdbcClient.getTableNames(session, schemaName); - } - - @Override - public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) - { - return jdbcClient.getColumns(session, (JdbcTableHandle) tableHandle).stream() - .collect(toImmutableMap(columnHandle -> columnHandle.getColumnMetadata().getName(), identity())); - } - - @Override - public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) - { - ImmutableMap.Builder> columns = ImmutableMap.builder(); - List tables = prefix.toOptionalSchemaTableName() - .>map(ImmutableList::of) - .orElseGet(() -> listTables(session, prefix.getSchema())); - for (SchemaTableName tableName : tables) { - try { - jdbcClient.getTableHandle(session, tableName) - .ifPresent(tableHandle -> columns.put(tableName, getTableMetadata(session, tableHandle).getColumns())); - } - catch (TableNotFoundException | AccessDeniedException e) { - // table disappeared during listing operation or user is not allowed to access it - // these exceptions are ignored because listTableColumns is used for metadata queries (SELECT FROM information_schema) - } - } - return columns.build(); - } - - @Override - public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) - { - return ((JdbcColumnHandle) columnHandle).getColumnMetadata(); - } - - @Override - public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) - { - if (!allowDropTable) { - throw new TrinoException(PERMISSION_DENIED, "DROP TABLE is disabled in this catalog"); - } - JdbcTableHandle handle = (JdbcTableHandle) tableHandle; - verify(!handle.isSynthetic(), "Not a table reference: %s", handle); - jdbcClient.dropTable(session, handle); - } - - @Override - public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) - { - JdbcOutputTableHandle handle = jdbcClient.beginCreateTable(session, tableMetadata); - setRollback(() -> jdbcClient.rollbackCreateTable(session, handle)); - return handle; - } - - @Override - public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) - { - jdbcClient.createTable(session, tableMetadata); - } - - @Override - public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) - { - JdbcOutputTableHandle handle = (JdbcOutputTableHandle) tableHandle; - jdbcClient.commitCreateTable(session, handle); - return Optional.empty(); - } - - private void setRollback(Runnable action) - { - checkState(rollbackAction.compareAndSet(null, action), "rollback action is already set"); - } - - public void rollback() - { - Optional.ofNullable(rollbackAction.getAndSet(null)).ifPresent(Runnable::run); - } - - @Override - public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns) - { - verify(!((JdbcTableHandle) tableHandle).isSynthetic(), "Not a table reference: %s", tableHandle); - List columnHandles = columns.stream() - .map(JdbcColumnHandle.class::cast) - .collect(toImmutableList()); - JdbcOutputTableHandle handle = jdbcClient.beginInsertTable(session, (JdbcTableHandle) tableHandle, columnHandles); - setRollback(() -> jdbcClient.rollbackCreateTable(session, handle)); - return handle; - } - - @Override - public boolean supportsMissingColumnsOnInsert() - { - return true; - } - - @Override - public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection fragments, Collection computedStatistics) - { - JdbcOutputTableHandle jdbcInsertHandle = (JdbcOutputTableHandle) tableHandle; - jdbcClient.finishInsertTable(session, jdbcInsertHandle); - return Optional.empty(); - } - - @Override - public void setColumnComment(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column, Optional comment) - { - JdbcTableHandle tableHandle = (JdbcTableHandle) table; - JdbcColumnHandle columnHandle = (JdbcColumnHandle) column; - verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); - jdbcClient.setColumnComment(session, tableHandle, columnHandle, comment); - } - - @Override - public void addColumn(ConnectorSession session, ConnectorTableHandle table, ColumnMetadata columnMetadata) - { - JdbcTableHandle tableHandle = (JdbcTableHandle) table; - verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); - jdbcClient.addColumn(session, tableHandle, columnMetadata); - } - - @Override - public void dropColumn(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column) - { - JdbcTableHandle tableHandle = (JdbcTableHandle) table; - JdbcColumnHandle columnHandle = (JdbcColumnHandle) column; - verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); - jdbcClient.dropColumn(session, tableHandle, columnHandle); - } - - @Override - public void renameColumn(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column, String target) - { - JdbcTableHandle tableHandle = (JdbcTableHandle) table; - JdbcColumnHandle columnHandle = (JdbcColumnHandle) column; - verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); - jdbcClient.renameColumn(session, tableHandle, columnHandle, target); - } - - @Override - public void renameTable(ConnectorSession session, ConnectorTableHandle table, SchemaTableName newTableName) - { - JdbcTableHandle tableHandle = (JdbcTableHandle) table; - verify(!tableHandle.isSynthetic(), "Not a table reference: %s", tableHandle); - jdbcClient.renameTable(session, tableHandle, newTableName); - } - - @Override - public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint) - { - JdbcTableHandle handle = (JdbcTableHandle) tableHandle; - return jdbcClient.getTableStatistics(session, handle, constraint.getSummary()); - } - - @Override - public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner) - { - jdbcClient.createSchema(session, schemaName); - } - - @Override - public void dropSchema(ConnectorSession session, String schemaName) - { - jdbcClient.dropSchema(session, schemaName); - } - - private static boolean containSameElements(Iterable first, Iterable second) - { - return ImmutableSet.copyOf(first).equals(ImmutableSet.copyOf(second)); - } + void rollback(); } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadataFactory.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadataFactory.java index 192dafd9aa0cd..bf91f3eeb1726 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadataFactory.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcMetadataFactory.java @@ -13,37 +13,7 @@ */ package io.trino.plugin.jdbc; -import io.airlift.units.Duration; - -import javax.inject.Inject; - -import java.util.Set; - -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.DAYS; - -public class JdbcMetadataFactory +public interface JdbcMetadataFactory { - private final JdbcClient jdbcClient; - private final boolean allowDropTable; - - @Inject - public JdbcMetadataFactory(JdbcClient jdbcClient, JdbcMetadataConfig config) - { - this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); - requireNonNull(config, "config is null"); - this.allowDropTable = config.isAllowDropTable(); - } - - public JdbcMetadata create() - { - // Session stays the same per transaction, therefore session properties don't need to - // be a part of cache keys in CachingJdbcClient. - return new JdbcMetadata(new CachingJdbcClient( - jdbcClient, - Set.of(), - new SingletonJdbcIdentityCacheMapping(), - new Duration(1, DAYS), true), - allowDropTable); - } + JdbcMetadata create(JdbcTransactionHandle transaction); } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index 2a400ecc400e0..4d7120d2c4905 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -60,7 +60,7 @@ public void configure(Binder binder) procedureBinder(binder); tablePropertiesProviderBinder(binder); - binder.bind(JdbcMetadataFactory.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, JdbcMetadataFactory.class).setDefault().to(DefaultJdbcMetadataFactory.class).in(Scopes.SINGLETON); newOptionalBinder(binder, ConnectorSplitManager.class).setDefault().to(JdbcSplitManager.class).in(Scopes.SINGLETON); newOptionalBinder(binder, ConnectorRecordSetProvider.class).setDefault().to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, ConnectorPageSinkProvider.class).setDefault().to(JdbcPageSinkProvider.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcMetadata.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestDefaultJdbcMetadata.java similarity index 98% rename from plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcMetadata.java rename to plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestDefaultJdbcMetadata.java index 2ce592236c803..025f5669431b4 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcMetadata.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestDefaultJdbcMetadata.java @@ -58,10 +58,10 @@ import static org.testng.Assert.assertTrue; @Test(singleThreaded = true) -public class TestJdbcMetadata +public class TestDefaultJdbcMetadata { private TestingDatabase database; - private JdbcMetadata metadata; + private DefaultJdbcMetadata metadata; private JdbcTableHandle tableHandle; @BeforeMethod @@ -69,7 +69,7 @@ public void setUp() throws Exception { database = new TestingDatabase(); - metadata = new JdbcMetadata(new GroupingSetsEnabledJdbcClient(database.getJdbcClient()), false); + metadata = new DefaultJdbcMetadata(new GroupingSetsEnabledJdbcClient(database.getJdbcClient()), false); tableHandle = metadata.getTableHandle(SESSION, new SchemaTableName("example", "numbers")); } @@ -229,7 +229,7 @@ public void testDropTableTable() .hasErrorCode(PERMISSION_DENIED) .hasMessage("DROP TABLE is disabled in this catalog"); - metadata = new JdbcMetadata(database.getJdbcClient(), true); + metadata = new DefaultJdbcMetadata(database.getJdbcClient(), true); metadata.dropTable(SESSION, tableHandle); assertTrinoExceptionThrownBy(() -> metadata.getTableMetadata(SESSION, tableHandle)) diff --git a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixMetadata.java b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixMetadata.java index cd09e203a1564..81c29be84a061 100644 --- a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixMetadata.java +++ b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixMetadata.java @@ -14,8 +14,8 @@ package io.trino.plugin.phoenix; import io.airlift.slice.Slice; +import io.trino.plugin.jdbc.DefaultJdbcMetadata; import io.trino.plugin.jdbc.JdbcColumnHandle; -import io.trino.plugin.jdbc.JdbcMetadata; import io.trino.plugin.jdbc.JdbcMetadataConfig; import io.trino.plugin.jdbc.JdbcTableHandle; import io.trino.spi.TrinoException; @@ -56,7 +56,7 @@ import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument; public class PhoenixMetadata - extends JdbcMetadata + extends DefaultJdbcMetadata { // Maps to Phoenix's default empty schema public static final String DEFAULT_SCHEMA = "default"; diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java index 6087d455db968..e8aae4a04f4bc 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java @@ -14,8 +14,8 @@ package io.trino.plugin.phoenix5; import io.airlift.slice.Slice; +import io.trino.plugin.jdbc.DefaultJdbcMetadata; import io.trino.plugin.jdbc.JdbcColumnHandle; -import io.trino.plugin.jdbc.JdbcMetadata; import io.trino.plugin.jdbc.JdbcMetadataConfig; import io.trino.plugin.jdbc.JdbcTableHandle; import io.trino.spi.TrinoException; @@ -56,7 +56,7 @@ import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument; public class PhoenixMetadata - extends JdbcMetadata + extends DefaultJdbcMetadata { // Maps to Phoenix's default empty schema public static final String DEFAULT_SCHEMA = "default";