Skip to content

Commit

Permalink
Add logic to limit chance for data loss in Hive OPTIMIZE table procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Oct 12, 2021
1 parent 96d211f commit 262ebf3
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore,
}
}

@Override
public LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table)
{
// For OPTIMIZE write result files directly to table directory; that is needed by the commit logic in HiveMetadata#finishTableExecute
Path targetPath = new Path(table.getStorage().getLocation());
return new LocationHandle(targetPath, targetPath, true, DIRECT_TO_TARGET_EXISTING_DIRECTORY);
}

private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path, Optional<Path> externalLocation)
{
return isTemporaryStagingDirectoryEnabled(session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.plugin.base.CatalogName;
Expand Down Expand Up @@ -126,6 +127,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand All @@ -138,6 +140,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -261,13 +264,13 @@
import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery;
import static io.trino.plugin.hive.util.HiveWriteUtils.isS3FileSystem;
import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType;
import static io.trino.plugin.hive.util.RetryDriver.retry;
import static io.trino.plugin.hive.util.Statistics.ReduceOperator.ADD;
import static io.trino.plugin.hive.util.Statistics.createComputedStatisticsToPartitionMap;
import static io.trino.plugin.hive.util.Statistics.createEmptyPartitionStatistics;
import static io.trino.plugin.hive.util.Statistics.fromComputedStatistics;
import static io.trino.plugin.hive.util.Statistics.reduce;
import static io.trino.plugin.hive.util.SystemTables.getSourceTableNameFromSystemTable;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
Expand All @@ -294,6 +297,8 @@
public class HiveMetadata
implements TransactionalMetadata
{
private static final Logger log = Logger.get(HiveMetadata.class);

public static final String PRESTO_VERSION_NAME = "presto_version";
public static final String TRINO_CREATED_BY = "trino_created_by";
public static final String PRESTO_QUERY_ID_NAME = "presto_query_id";
Expand Down Expand Up @@ -1959,7 +1964,7 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(Connecto
if (table.getParameters().containsKey(SKIP_FOOTER_COUNT_KEY)) {
throw new TrinoException(NOT_SUPPORTED, format("Optimizing Hive table %s with %s property not supported", tableName, SKIP_FOOTER_COUNT_KEY));
}
LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table);
LocationHandle locationHandle = locationService.forOptimize(metastore, session, table);

DataSize fileSizeThreshold = (DataSize) executeProperties.get("file_size_threshold");
hiveTableHandle = hiveTableHandle
Expand All @@ -1975,6 +1980,7 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(Connecto
return Optional.of(new HiveTableExecuteHandle(
OptimizeTableProcedure.NAME,
hiveTableHandle,
Optional.empty(),
tableName.getSchemaName(),
tableName.getTableName(),
columns,
Expand Down Expand Up @@ -2017,8 +2023,8 @@ private ConnectorTableExecuteHandle beginOptimize(ConnectorSession session, Conn
{
HiveTableExecuteHandle hiveExecuteHandle = (HiveTableExecuteHandle) tableExecuteHandle;
WriteInfo writeInfo = locationService.getQueryWriteInfo(hiveExecuteHandle.getLocationHandle());
metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), hiveExecuteHandle.getSchemaTableName());
return tableExecuteHandle;
String writeDeclarationId = metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), hiveExecuteHandle.getSchemaTableName());
return hiveExecuteHandle.withWriteDeclarationId(writeDeclarationId);
}

@Override
Expand All @@ -2038,6 +2044,7 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl
// TODO lots of that is copied from finishInsert; rafactoring opportunity

HiveTableExecuteHandle handle = (HiveTableExecuteHandle) tableExecuteHandle;
checkArgument(handle.getWriteDeclarationId().isPresent(), "no write declaration id present in tableExecuteHandle");

List<PartitionUpdate> partitionUpdates = fragments.stream()
.map(Slice::getBytes)
Expand Down Expand Up @@ -2087,18 +2094,36 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl
}
}

boolean someDeleted = false;

// track remaining files to be delted for error reporting
Set<String> remainingFilesToDelete = tableExecuteState.stream()
.map(value -> (String) value)
.collect(Collectors.toCollection(HashSet::new));

try {
// TODO; this is not safe; we can delete old files. And then if there is error new files will be deleted too.
FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(table.getStorage().getLocation()));
for (Object scannedPathObject : tableExecuteState) {
someDeleted = true;
Path scannedPath = new Path((String) scannedPathObject);
fs.delete(scannedPath, false);
retry().run("delete " + scannedPath, () -> fs.delete(scannedPath, false));
remainingFilesToDelete.remove(scannedPathObject);
}
}
catch (IOException e) {
throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Error while deleting ", e);
catch (Exception e) {
if (!someDeleted) {
// we are good - we did not delete any source files so we can just throw error and allow rollback to happend
throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Error while deleting original files", e);
}

// If we already deleted some original files we disable rollback routine so written files are not deleted
// The reported exceptiona and log entry lists files which need to be cleaned up by user manually.
// Until table is cleaned up there will duplicate rows present.
metastore.dropDeclaredIntentionToWrite(handle.getWriteDeclarationId().get());
String errorMessage = "Error while deleting data files in FINISH phase of OPTIMIZE for table " + table.getTableName() + "; remaining files need to be deleted manually: " + remainingFilesToDelete;
log.error(e, errorMessage);
throw new TrinoException(HIVE_FILESYSTEM_ERROR, errorMessage, e);
}
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Blah!!!");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ public class HiveTableExecuteHandle
{
private final String procedureName;
private final HiveTableHandle sourceTableHandle;
private final Optional<String> writeDeclarationId;

@JsonCreator
public HiveTableExecuteHandle(
@JsonProperty("procedureName") String procedureName,
@JsonProperty("sourceTableHandle") HiveTableHandle sourceTableHandle,
@JsonProperty("writeDeclarationId") Optional<String> writeDeclarationId,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("inputColumns") List<HiveColumnHandle> inputColumns,
Expand All @@ -59,6 +61,7 @@ public HiveTableExecuteHandle(

this.procedureName = requireNonNull(procedureName, "procedureName is null");
this.sourceTableHandle = requireNonNull(sourceTableHandle, "sourceTableHandle is null");
this.writeDeclarationId = requireNonNull(writeDeclarationId, "writeDeclarationId is null");
}

@JsonProperty
Expand All @@ -74,6 +77,29 @@ public ConnectorTableHandle getSourceTableHandle()
return sourceTableHandle;
}

@JsonProperty
public Optional<String> getWriteDeclarationId()
{
return writeDeclarationId;
}

public HiveTableExecuteHandle withWriteDeclarationId(String writeDeclarationId)
{
return new HiveTableExecuteHandle(
procedureName,
sourceTableHandle,
Optional.of(writeDeclarationId),
getSchemaName(),
getTableName(),
getInputColumns(),
getPageSinkMetadata(),
getLocationHandle(),
getBucketProperty(),
getTableStorageFormat(),
getPartitionStorageFormat(),
getTransaction());
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface LocationService

LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table);

LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table);

/**
* targetPath and writePath will be root directory of all partition and table paths
* that may be returned by {@link #getTableWriteInfo(LocationHandle, boolean)} and {@link #getPartitionWriteInfo(LocationHandle, Optional, String)} method.
Expand Down

0 comments on commit 262ebf3

Please sign in to comment.