Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read support for original files of ACID/Transactional tables #2930

Closed
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 116 additions & 12 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/AcidInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import org.apache.hadoop.fs.Path;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -33,15 +37,32 @@ public class AcidInfo
{
private final String partitionLocation;
private final List<DeleteDeltaInfo> deleteDeltas;
private final List<OriginalFileInfo> originalFiles;
private final int bucketId;

@JsonCreator
public AcidInfo(
@JsonProperty("partitionLocation") String partitionLocation,
@JsonProperty("deleteDeltas") List<DeleteDeltaInfo> deleteDeltas)
@JsonProperty("deleteDeltas") List<DeleteDeltaInfo> deleteDeltas,
@JsonProperty("originalFiles") List<OriginalFileInfo> originalFiles,
@JsonProperty("bucketId") int bucketId)
{
this.partitionLocation = requireNonNull(partitionLocation, "partitionLocation is null");
this.deleteDeltas = ImmutableList.copyOf(requireNonNull(deleteDeltas, "deleteDeltas is null"));
checkArgument(!deleteDeltas.isEmpty(), "deleteDeltas is empty");
this.originalFiles = ImmutableList.copyOf(requireNonNull(originalFiles, "originalFiles is null"));
this.bucketId = bucketId;
}

@JsonProperty
public List<OriginalFileInfo> getOriginalFiles()
{
return originalFiles;
}

@JsonProperty
public int getBucketId()
{
return bucketId;
}

@JsonProperty
Expand All @@ -62,20 +83,20 @@ public boolean equals(Object o)
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

AcidInfo that = (AcidInfo) o;
return partitionLocation.equals(that.partitionLocation) &&
deleteDeltas.equals(that.deleteDeltas);
return bucketId == that.bucketId &&
Objects.equals(partitionLocation, that.partitionLocation) &&
Objects.equals(deleteDeltas, that.deleteDeltas) &&
Objects.equals(originalFiles, that.originalFiles);
}

@Override
public int hashCode()
{
return Objects.hash(partitionLocation, deleteDeltas);
return Objects.hash(partitionLocation, deleteDeltas, originalFiles, bucketId);
}

@Override
Expand All @@ -84,6 +105,8 @@ public String toString()
return toStringHelper(this)
.add("partitionLocation", partitionLocation)
.add("deleteDeltas", deleteDeltas)
.add("originalFiles", originalFiles)
.add("bucketId", bucketId)
.toString();
}

Expand Down Expand Up @@ -156,6 +179,62 @@ public String toString()
}
}

public static class OriginalFileInfo
{
private final String name;
private final long fileSize;

@JsonCreator
public OriginalFileInfo(
@JsonProperty("name") String name,
@JsonProperty("fileSize") long fileSize)
{
this.name = requireNonNull(name, "name is null");
this.fileSize = fileSize;
}

@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public long getFileSize()
{
return fileSize;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OriginalFileInfo that = (OriginalFileInfo) o;
return fileSize == that.fileSize &&
name.equals(that.name);
}

@Override
public int hashCode()
{
return Objects.hash(name, fileSize);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("name", name)
.add("fileSize", fileSize)
.toString();
}
}

public static Builder builder(Path partitionPath)
{
return new Builder(partitionPath);
Expand All @@ -169,7 +248,8 @@ public static Builder builder(AcidInfo acidInfo)
public static class Builder
{
private final Path partitionLocation;
private final ImmutableList.Builder<DeleteDeltaInfo> deleteDeltaInfoBuilder = ImmutableList.builder();
private final List<DeleteDeltaInfo> deleteDeltaInfos = new ArrayList<>();
private final ListMultimap<Integer, OriginalFileInfo> bucketIdToOriginalFileInfoMap = ArrayListMultimap.create();

private Builder(Path partitionPath)
{
Expand All @@ -179,7 +259,7 @@ private Builder(Path partitionPath)
private Builder(AcidInfo acidInfo)
{
partitionLocation = new Path(acidInfo.getPartitionLocation());
deleteDeltaInfoBuilder.addAll(acidInfo.deleteDeltas);
deleteDeltaInfos.addAll(acidInfo.deleteDeltas);
}

public Builder addDeleteDelta(Path deleteDeltaPath, long minWriteId, long maxWriteId, int statementId)
Expand All @@ -192,17 +272,41 @@ public Builder addDeleteDelta(Path deleteDeltaPath, long minWriteId, long maxWri
deleteDeltaPath.getParent().toString(),
partitionLocation);

deleteDeltaInfoBuilder.add(new DeleteDeltaInfo(minWriteId, maxWriteId, statementId));
deleteDeltaInfos.add(new DeleteDeltaInfo(minWriteId, maxWriteId, statementId));
return this;
}

public Builder addOriginalFile(Path originalFilePath, long originalFileLength, int bucketId)
{
requireNonNull(originalFilePath, "originalFilePath is null");
Path partitionPathFromOriginalPath = originalFilePath.getParent();
// originalFilePath has scheme in the prefix (i.e. scheme://<path>), extract path from uri and compare.
checkArgument(
partitionLocation.toUri().getPath().equals(partitionPathFromOriginalPath.toUri().getPath()),
"Partition location in OriginalFile '%s' does not match stored location '%s'",
originalFilePath.getParent().toString(),
partitionLocation);
bucketIdToOriginalFileInfoMap.put(bucketId, new OriginalFileInfo(originalFilePath.getName(), originalFileLength));
return this;
}

public AcidInfo buildWithRequiredOriginalFiles(int bucketId)
{
checkState(
bucketId > -1 && bucketIdToOriginalFileInfoMap.containsKey(bucketId),
"Bucket Id to OriginalFileInfo map should have entry for requested bucket id: %s",
bucketId);
List<DeleteDeltaInfo> deleteDeltas = ImmutableList.copyOf(deleteDeltaInfos);
return new AcidInfo(partitionLocation.toString(), deleteDeltas, bucketIdToOriginalFileInfoMap.get(bucketId), bucketId);
}

public Optional<AcidInfo> build()
{
List<DeleteDeltaInfo> deleteDeltas = deleteDeltaInfoBuilder.build();
List<DeleteDeltaInfo> deleteDeltas = ImmutableList.copyOf(deleteDeltaInfos);
if (deleteDeltas.isEmpty()) {
return Optional.empty();
}
return Optional.of(new AcidInfo(partitionLocation.toString(), deleteDeltas));
return Optional.of(new AcidInfo(partitionLocation.toString(), deleteDeltas, ImmutableList.of(), -1));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.prestosql.plugin.hive.util.HiveBucketing.BucketingVersion;
import io.prestosql.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.prestosql.plugin.hive.util.HiveFileIterator;
import io.prestosql.plugin.hive.util.HiveFileIterator.NestedDirectoryNotAllowedException;
import io.prestosql.plugin.hive.util.InternalHiveSplitFactory;
import io.prestosql.plugin.hive.util.ResumableTask;
import io.prestosql.plugin.hive.util.ResumableTasks;
Expand All @@ -46,6 +45,7 @@
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
Expand Down Expand Up @@ -435,7 +435,8 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
}

List<Path> readPaths;
Optional<AcidInfo> acidInfo;
List<HdfsFileStatusWithId> fileStatusOriginalFiles = ImmutableList.of();
AcidInfo.Builder acidInfoBuilder = AcidInfo.builder(path);
if (AcidUtils.isTransactionalTable(table.getParameters())) {
AcidUtils.Directory directory = hdfsEnvironment.doAs(hdfsContext.getIdentity().getUser(), () -> AcidUtils.getAcidState(
path,
Expand All @@ -457,10 +458,6 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)

readPaths = new ArrayList<>();

if (!directory.getOriginalFiles().isEmpty()) {
throw new PrestoException(NOT_SUPPORTED, "Original non-ACID files in transactional tables are not supported");
}

// base
if (directory.getBaseDirectory() != null) {
readPaths.add(directory.getBaseDirectory());
Expand All @@ -474,18 +471,28 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
}

// Create a registry of delete_delta directories for the partition
AcidInfo.Builder acidInfoBuilder = AcidInfo.builder(path);
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
if (delta.isDeleteDelta()) {
acidInfoBuilder.addDeleteDelta(delta.getPath(), delta.getMinWriteId(), delta.getMaxWriteId(), delta.getStatementId());
}
}

acidInfo = acidInfoBuilder.build();
// initialize original files status list if present
fileStatusOriginalFiles = directory.getOriginalFiles();

for (HdfsFileStatusWithId hdfsFileStatusWithId : fileStatusOriginalFiles) {
Path originalFilePath = hdfsFileStatusWithId.getFileStatus().getPath();
long originalFileLength = hdfsFileStatusWithId.getFileStatus().getLen();
if (originalFileLength == 0) {
continue;
}
// Hive requires "original" files of transactional tables to conform to the bucketed tables naming pattern, to match them with delete deltas.
int bucketId = getRequiredBucketNumber(originalFilePath);
acidInfoBuilder.addOriginalFile(originalFilePath, originalFileLength, bucketId);
findepi marked this conversation as resolved.
Show resolved Hide resolved
}
}
else {
readPaths = ImmutableList.of(path);
acidInfo = Optional.empty();
}

// S3 Select pushdown works at the granularity of individual S3 objects,
Expand All @@ -497,18 +504,63 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
if (tableBucketInfo.isPresent()) {
ListenableFuture<?> lastResult = immediateFuture(null); // TODO document in addToQueue() that it is sufficient to hold on to last returned future
for (Path readPath : readPaths) {
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(readPath, fs, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfo));
// list all files in the partition
List<LocatedFileStatus> files = new ArrayList<>();
try {
Iterators.addAll(files, new HiveFileIterator(table, readPath, fs, directoryLister, namenodeStats, FAIL, ignoreAbsentPartitions));
}
catch (HiveFileIterator.NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
throw new PrestoException(
HIVE_INVALID_BUCKET_FILES,
format("Hive table '%s' is corrupt. Found sub-directory in bucket directory for partition: %s",
table.getSchemaTableName(),
splitFactory.getPartitionName()));
}
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(files, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfoBuilder.build()));
}

for (HdfsFileStatusWithId hdfsFileStatusWithId : fileStatusOriginalFiles) {
List<LocatedFileStatus> locatedFileStatuses = ImmutableList.of((LocatedFileStatus) hdfsFileStatusWithId.getFileStatus());
Optional<AcidInfo> acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(hdfsFileStatusWithId.getFileStatus().getPath())));
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(locatedFileStatuses, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfo));
}

return lastResult;
}

for (Path readPath : readPaths) {
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfo));
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfoBuilder.build()));
}

if (!fileStatusOriginalFiles.isEmpty()) {
fileIterators.addLast(generateOriginalFilesSplits(splitFactory, fileStatusOriginalFiles, splittable, acidInfoBuilder));
}

return COMPLETED_FUTURE;
}

private Iterator<InternalHiveSplit> generateOriginalFilesSplits(
InternalHiveSplitFactory splitFactory,
List<HdfsFileStatusWithId> originalFileLocations,
boolean splittable,
AcidInfo.Builder acidInfoBuilder)
{
return originalFileLocations.stream()
.map(HdfsFileStatusWithId::getFileStatus)
.map(fileStatus -> {
Optional<AcidInfo> acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(fileStatus.getPath())));
return splitFactory.createInternalHiveSplit(
(LocatedFileStatus) fileStatus,
OptionalInt.empty(),
splittable,
acidInfo);
})
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
}

private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, InternalHiveSplitFactory splitFactory)
throws IOException
{
Expand Down Expand Up @@ -542,27 +594,19 @@ private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, F
.iterator();
}

private List<InternalHiveSplit> getBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, BucketSplitInfo bucketSplitInfo, Optional<BucketConversion> bucketConversion, boolean splittable, Optional<AcidInfo> acidInfo)
private List<InternalHiveSplit> getBucketedSplits(
List<LocatedFileStatus> files,
InternalHiveSplitFactory splitFactory,
BucketSplitInfo bucketSplitInfo,
Optional<BucketConversion> bucketConversion,
boolean splittable,
Optional<AcidInfo> acidInfo)
{
int readBucketCount = bucketSplitInfo.getReadBucketCount();
int tableBucketCount = bucketSplitInfo.getTableBucketCount();
int partitionBucketCount = bucketConversion.map(BucketConversion::getPartitionBucketCount).orElse(tableBucketCount);
int bucketCount = max(readBucketCount, partitionBucketCount);

// list all files in the partition
List<LocatedFileStatus> files = new ArrayList<>(partitionBucketCount);
try {
Iterators.addAll(files, new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, FAIL, ignoreAbsentPartitions));
}
catch (NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
throw new PrestoException(
HIVE_INVALID_BUCKET_FILES,
format("Hive table '%s' is corrupt. Found sub-directory in bucket directory for partition: %s",
table.getSchemaTableName(),
splitFactory.getPartitionName()));
}

// build mapping of file name to bucket
ListMultimap<Integer, LocatedFileStatus> bucketFiles = ArrayListMultimap.create();
for (LocatedFileStatus file : files) {
Expand Down Expand Up @@ -661,6 +705,12 @@ static void validateFileBuckets(ListMultimap<Integer, LocatedFileStatus> bucketF
}
}

private static int getRequiredBucketNumber(Path path)
{
return getBucketNumber(path.getName())
.orElseThrow(() -> new IllegalStateException("Cannot get bucket number from path: " + path));
}

@VisibleForTesting
static OptionalInt getBucketNumber(String name)
{
Expand Down
Loading