From 262ebf336e925668f5abcaa991e35adbc78a9831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 12 Oct 2021 16:41:25 +0200 Subject: [PATCH] Add logic to limit chance for data loss in Hive OPTIMIZE table procedure --- .../plugin/hive/HiveLocationService.java | 8 ++++ .../io/trino/plugin/hive/HiveMetadata.java | 43 +++++++++++++++---- .../plugin/hive/HiveTableExecuteHandle.java | 26 +++++++++++ .../io/trino/plugin/hive/LocationService.java | 2 + 4 files changed, 70 insertions(+), 9 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java index 3e7f03f1bfe6..78d84c4f2106 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java @@ -88,6 +88,14 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, } } + @Override + public LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table) + { + // For OPTIMIZE write result files directly to table directory; that is needed by the commit logic in HiveMetadata#finishTableExecute + Path targetPath = new Path(table.getStorage().getLocation()); + return new LocationHandle(targetPath, targetPath, true, DIRECT_TO_TARGET_EXISTING_DIRECTORY); + } + private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path, Optional externalLocation) { return isTemporaryStagingDirectoryEnabled(session) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 5e274c81607f..1a144f0701d3 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -25,6 +25,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.plugin.base.CatalogName; @@ -126,6 +127,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -138,6 +140,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; @@ -261,13 +264,13 @@ import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery; import static io.trino.plugin.hive.util.HiveWriteUtils.isS3FileSystem; import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType; +import static io.trino.plugin.hive.util.RetryDriver.retry; import static io.trino.plugin.hive.util.Statistics.ReduceOperator.ADD; import static io.trino.plugin.hive.util.Statistics.createComputedStatisticsToPartitionMap; import static io.trino.plugin.hive.util.Statistics.createEmptyPartitionStatistics; import static io.trino.plugin.hive.util.Statistics.fromComputedStatistics; import static io.trino.plugin.hive.util.Statistics.reduce; import static io.trino.plugin.hive.util.SystemTables.getSourceTableNameFromSystemTable; -import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; @@ -294,6 +297,8 @@ public class HiveMetadata implements TransactionalMetadata { + private static final Logger log = Logger.get(HiveMetadata.class); + public static final String PRESTO_VERSION_NAME = "presto_version"; public static final String TRINO_CREATED_BY = "trino_created_by"; public static final String PRESTO_QUERY_ID_NAME = "presto_query_id"; @@ -1959,7 +1964,7 @@ private Optional getTableHandleForOptimize(Connecto if (table.getParameters().containsKey(SKIP_FOOTER_COUNT_KEY)) { throw new TrinoException(NOT_SUPPORTED, format("Optimizing Hive table %s with %s property not supported", tableName, SKIP_FOOTER_COUNT_KEY)); } - LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table); + LocationHandle locationHandle = locationService.forOptimize(metastore, session, table); DataSize fileSizeThreshold = (DataSize) executeProperties.get("file_size_threshold"); hiveTableHandle = hiveTableHandle @@ -1975,6 +1980,7 @@ private Optional getTableHandleForOptimize(Connecto return Optional.of(new HiveTableExecuteHandle( OptimizeTableProcedure.NAME, hiveTableHandle, + Optional.empty(), tableName.getSchemaName(), tableName.getTableName(), columns, @@ -2017,8 +2023,8 @@ private ConnectorTableExecuteHandle beginOptimize(ConnectorSession session, Conn { HiveTableExecuteHandle hiveExecuteHandle = (HiveTableExecuteHandle) tableExecuteHandle; WriteInfo writeInfo = locationService.getQueryWriteInfo(hiveExecuteHandle.getLocationHandle()); - metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), hiveExecuteHandle.getSchemaTableName()); - return tableExecuteHandle; + String writeDeclarationId = metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), hiveExecuteHandle.getSchemaTableName()); + return hiveExecuteHandle.withWriteDeclarationId(writeDeclarationId); } @Override @@ -2038,6 +2044,7 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl // TODO lots of that is copied from finishInsert; rafactoring opportunity HiveTableExecuteHandle handle = (HiveTableExecuteHandle) tableExecuteHandle; + checkArgument(handle.getWriteDeclarationId().isPresent(), "no write declaration id present in tableExecuteHandle"); List partitionUpdates = fragments.stream() .map(Slice::getBytes) @@ -2087,18 +2094,36 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl } } + boolean someDeleted = false; + + // track remaining files to be delted for error reporting + Set remainingFilesToDelete = tableExecuteState.stream() + .map(value -> (String) value) + .collect(Collectors.toCollection(HashSet::new)); + try { - // TODO; this is not safe; we can delete old files. And then if there is error new files will be deleted too. FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(table.getStorage().getLocation())); for (Object scannedPathObject : tableExecuteState) { + someDeleted = true; Path scannedPath = new Path((String) scannedPathObject); - fs.delete(scannedPath, false); + retry().run("delete " + scannedPath, () -> fs.delete(scannedPath, false)); + remainingFilesToDelete.remove(scannedPathObject); } } - catch (IOException e) { - throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Error while deleting ", e); + catch (Exception e) { + if (!someDeleted) { + // we are good - we did not delete any source files so we can just throw error and allow rollback to happend + throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Error while deleting original files", e); + } + + // If we already deleted some original files we disable rollback routine so written files are not deleted + // The reported exceptiona and log entry lists files which need to be cleaned up by user manually. + // Until table is cleaned up there will duplicate rows present. + metastore.dropDeclaredIntentionToWrite(handle.getWriteDeclarationId().get()); + String errorMessage = "Error while deleting data files in FINISH phase of OPTIMIZE for table " + table.getTableName() + "; remaining files need to be deleted manually: " + remainingFilesToDelete; + log.error(e, errorMessage); + throw new TrinoException(HIVE_FILESYSTEM_ERROR, errorMessage, e); } - throw new TrinoException(GENERIC_INTERNAL_ERROR, "Blah!!!"); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableExecuteHandle.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableExecuteHandle.java index 91d2e1a88b4e..dac720bf999a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableExecuteHandle.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableExecuteHandle.java @@ -31,11 +31,13 @@ public class HiveTableExecuteHandle { private final String procedureName; private final HiveTableHandle sourceTableHandle; + private final Optional writeDeclarationId; @JsonCreator public HiveTableExecuteHandle( @JsonProperty("procedureName") String procedureName, @JsonProperty("sourceTableHandle") HiveTableHandle sourceTableHandle, + @JsonProperty("writeDeclarationId") Optional writeDeclarationId, @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("inputColumns") List inputColumns, @@ -59,6 +61,7 @@ public HiveTableExecuteHandle( this.procedureName = requireNonNull(procedureName, "procedureName is null"); this.sourceTableHandle = requireNonNull(sourceTableHandle, "sourceTableHandle is null"); + this.writeDeclarationId = requireNonNull(writeDeclarationId, "writeDeclarationId is null"); } @JsonProperty @@ -74,6 +77,29 @@ public ConnectorTableHandle getSourceTableHandle() return sourceTableHandle; } + @JsonProperty + public Optional getWriteDeclarationId() + { + return writeDeclarationId; + } + + public HiveTableExecuteHandle withWriteDeclarationId(String writeDeclarationId) + { + return new HiveTableExecuteHandle( + procedureName, + sourceTableHandle, + Optional.of(writeDeclarationId), + getSchemaName(), + getTableName(), + getInputColumns(), + getPageSinkMetadata(), + getLocationHandle(), + getBucketProperty(), + getTableStorageFormat(), + getPartitionStorageFormat(), + getTransaction()); + } + @Override public String toString() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java index 74e4e6a7f2f8..2dab8e14dfed 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java @@ -29,6 +29,8 @@ public interface LocationService LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table); + LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table); + /** * targetPath and writePath will be root directory of all partition and table paths * that may be returned by {@link #getTableWriteInfo(LocationHandle, boolean)} and {@link #getPartitionWriteInfo(LocationHandle, Optional, String)} method.