Skip to content

Commit

Permalink
Add hidden column $file_modified_time with Hive table
Browse files Browse the repository at this point in the history
Cherry pick of trinodb/trino#1428

Co-authored-by: Praveen2112 <[email protected]>
  • Loading branch information
agrawalreetika and Praveen2112 committed Aug 1, 2021
1 parent aab0ed2 commit dfd2f24
Show file tree
Hide file tree
Showing 17 changed files with 138 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class HiveFileInfo
private final boolean isDirectory;
private final BlockLocation[] blockLocations;
private final long length;
private final long fileModifiedTime;
private final Optional<byte[]> extraFileInfo;

public static HiveFileInfo createHiveFileInfo(LocatedFileStatus locatedFileStatus, Optional<byte[]> extraFileContext)
Expand All @@ -37,15 +38,17 @@ public static HiveFileInfo createHiveFileInfo(LocatedFileStatus locatedFileStatu
locatedFileStatus.isDirectory(),
locatedFileStatus.getBlockLocations(),
locatedFileStatus.getLen(),
locatedFileStatus.getModificationTime(),
extraFileContext);
}

private HiveFileInfo(Path path, boolean isDirectory, BlockLocation[] blockLocations, long length, Optional<byte[]> extraFileInfo)
private HiveFileInfo(Path path, boolean isDirectory, BlockLocation[] blockLocations, long length, long fileModifiedTime, Optional<byte[]> extraFileInfo)
{
this.path = requireNonNull(path, "path is null");
this.isDirectory = isDirectory;
this.blockLocations = blockLocations;
this.length = length;
this.fileModifiedTime = fileModifiedTime;
this.extraFileInfo = requireNonNull(extraFileInfo, "extraFileInfo is null");
}

Expand All @@ -69,6 +72,11 @@ public long getLength()
return length;
}

public long getFileModifiedTime()
{
return fileModifiedTime;
}

public Optional<byte[]> getExtraFileInfo()
{
return extraFileInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public class HiveColumnHandle
public static final HiveType FILE_SIZE_TYPE = HIVE_LONG;
public static final TypeSignature FILE_SIZE_TYPE_SIGNATURE = FILE_SIZE_TYPE.getTypeSignature();

public static final int FILE_MODIFIED_TIME_COLUMN_INDEX = -14;
public static final String FILE_MODIFIED_TIME_COLUMN_NAME = "$file_modified_time";
public static final HiveType FILE_MODIFIED_TIME_TYPE = HIVE_LONG;
public static final TypeSignature FILE_MODIFIED_TIME_TYPE_SIGNATURE = FILE_MODIFIED_TIME_TYPE.getTypeSignature();

private static final String UPDATE_ROW_ID_COLUMN_NAME = "$shard_row_id";

// Ids <= this can be used for distinguishing between different prefilled columns.
Expand Down Expand Up @@ -253,6 +258,11 @@ public static HiveColumnHandle fileSizeColumnHandle()
return new HiveColumnHandle(FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, FILE_SIZE_TYPE_SIGNATURE, FILE_SIZE_COLUMN_INDEX, SYNTHESIZED, Optional.empty(), ImmutableList.of(), Optional.empty());
}

public static HiveColumnHandle fileModifiedTimeColumnHandle()
{
return new HiveColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, FILE_MODIFIED_TIME_TYPE_SIGNATURE, FILE_MODIFIED_TIME_COLUMN_INDEX, SYNTHESIZED, Optional.empty(), ImmutableList.of(), Optional.empty());
}

public static boolean isPathColumnHandle(HiveColumnHandle column)
{
return column.getHiveColumnIndex() == PATH_COLUMN_INDEX;
Expand Down Expand Up @@ -281,4 +291,9 @@ public static boolean isFileSizeColumnHandle(HiveColumnHandle column)
{
return column.getHiveColumnIndex() == FILE_SIZE_COLUMN_INDEX;
}

public static boolean isFileModifiedTimeColumnHandle(HiveColumnHandle column)
{
return column.getHiveColumnIndex() == FILE_MODIFIED_TIME_COLUMN_INDEX;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME;
import static com.facebook.presto.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME;
import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_NAME;
import static com.facebook.presto.hive.HiveColumnHandle.updateRowIdHandle;
Expand Down Expand Up @@ -3618,6 +3619,7 @@ private static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(T
}

builder.put(FILE_SIZE_COLUMN_NAME, Optional.empty());
builder.put(FILE_MODIFIED_TIME_COLUMN_NAME, Optional.empty());

Map<String, Optional<String>> columnComment = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public ConnectorPageSource createPageSource(
hiveSplit.getStart(),
hiveSplit.getLength(),
hiveSplit.getFileSize(),
hiveSplit.getFileModifiedTime(),
hiveSplit.getStorage(),
splitContext.getDynamicFilterPredicate().map(filter -> filter.transform(handle -> (HiveColumnHandle) handle).intersect(effectivePredicate)).orElse(effectivePredicate),
selectedColumns,
Expand Down Expand Up @@ -269,7 +270,8 @@ private static Optional<ConnectorPageSource> createSelectivePageSource(
split.getTableToPartitionMapping(),
path,
split.getTableBucketNumber(),
split.getFileSize());
split.getFileSize(),
split.getFileModifiedTime());

Optional<BucketAdaptation> bucketAdaptation = split.getBucketConversion().map(conversion -> toBucketAdaptation(conversion, columnMappings, split.getTableBucketNumber(), mapping -> mapping.getHiveColumnHandle().getHiveColumnIndex()));

Expand Down Expand Up @@ -336,6 +338,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
long start,
long length,
long fileSize,
long fileModifiedTime,
Storage storage,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> hiveColumns,
Expand Down Expand Up @@ -382,7 +385,8 @@ public static Optional<ConnectorPageSource> createHivePageSource(
tableToPartitionMapping,
path,
tableBucketNumber,
fileSize);
fileSize,
fileModifiedTime);

Set<Integer> outputIndices = hiveColumns.stream()
.map(HiveColumnHandle::getHiveColumnIndex)
Expand Down Expand Up @@ -668,7 +672,8 @@ public static List<ColumnMapping> buildColumnMappings(
TableToPartitionMapping tableToPartitionMapping,
Path path,
OptionalInt bucketNumber,
long fileSize)
long fileSize,
long fileModifiedTime)
{
Map<String, HivePartitionKey> partitionKeysByName = uniqueIndex(partitionKeys, HivePartitionKey::getName);
int regularIndex = 0;
Expand Down Expand Up @@ -706,7 +711,7 @@ else if (isPushedDownSubfield(column)) {
else {
columnMappings.add(prefilled(
column,
getPrefilledColumnValue(column, partitionKeysByName.get(column.getName()), path, bucketNumber, fileSize),
getPrefilledColumnValue(column, partitionKeysByName.get(column.getName()), path, bucketNumber, fileSize, fileModifiedTime),
coercionFrom));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class HiveSplit
private final long start;
private final long length;
private final long fileSize;
private final long fileModifiedTime;
private final Storage storage;
private final List<HivePartitionKey> partitionKeys;
private final List<HostAddress> addresses;
Expand Down Expand Up @@ -74,6 +75,7 @@ public HiveSplit(
@JsonProperty("start") long start,
@JsonProperty("length") long length,
@JsonProperty("fileSize") long fileSize,
@JsonProperty("fileModifiedTime") long fileModifiedTime,
@JsonProperty("storage") Storage storage,
@JsonProperty("partitionKeys") List<HivePartitionKey> partitionKeys,
@JsonProperty("addresses") List<HostAddress> addresses,
Expand Down Expand Up @@ -117,6 +119,7 @@ public HiveSplit(
this.start = start;
this.length = length;
this.fileSize = fileSize;
this.fileModifiedTime = fileModifiedTime;
this.storage = storage;
this.partitionKeys = ImmutableList.copyOf(partitionKeys);
this.addresses = ImmutableList.copyOf(addresses);
Expand Down Expand Up @@ -176,6 +179,12 @@ public long getFileSize()
return fileSize;
}

@JsonProperty
public long getFileModifiedTime()
{
return fileModifiedTime;
}

@JsonProperty
public Storage getStorage()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
internalSplit.getStart(),
splitBytes,
internalSplit.getFileSize(),
internalSplit.getFileModifiedTime(),
internalSplit.getPartitionInfo().getStorage(),
internalSplit.getPartitionKeys(),
block.getAddresses(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,10 @@
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveColumnHandle.MAX_PARTITION_KEY_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveColumnHandle.bucketColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.fileModifiedTimeColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.fileSizeColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.isBucketColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.isFileModifiedTimeColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.isFileSizeColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.isPathColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.pathColumnHandle;
Expand Down Expand Up @@ -890,6 +892,7 @@ public static List<HiveColumnHandle> hiveColumnHandles(Table table)
columns.add(bucketColumnHandle());
}
columns.add(fileSizeColumnHandle());
columns.add(fileModifiedTimeColumnHandle());

return columns.build();
}
Expand Down Expand Up @@ -934,7 +937,7 @@ public static String columnExtraInfo(boolean partitionKey)
return partitionKey ? "partition key" : null;
}

public static Optional<String> getPrefilledColumnValue(HiveColumnHandle columnHandle, HivePartitionKey partitionKey, Path path, OptionalInt bucketNumber, long fileSize)
public static Optional<String> getPrefilledColumnValue(HiveColumnHandle columnHandle, HivePartitionKey partitionKey, Path path, OptionalInt bucketNumber, long fileSize, long fileModifiedTime)
{
if (partitionKey != null) {
return partitionKey.getValue();
Expand All @@ -951,6 +954,9 @@ public static Optional<String> getPrefilledColumnValue(HiveColumnHandle columnHa
if (isFileSizeColumnHandle(columnHandle)) {
return Optional.of(String.valueOf(fileSize));
}
if (isFileModifiedTimeColumnHandle(columnHandle)) {
return Optional.of(String.valueOf(fileModifiedTime));
}

throw new PrestoException(NOT_SUPPORTED, "unsupported hidden column: " + columnHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class InternalHiveSplit
private final byte[] relativeUri;
private final long end;
private final long fileSize;
private final long fileModifiedTime;

// encode the hive blocks as an array of longs and list of list of addresses to save memory
//if all blockAddress lists are empty, store only the empty list
Expand Down Expand Up @@ -75,6 +76,7 @@ public InternalHiveSplit(
long start,
long end,
long fileSize,
long fileModifiedTime,
List<InternalHiveBlock> blocks,
OptionalInt readBucketNumber,
OptionalInt tableBucketNumber,
Expand All @@ -101,6 +103,7 @@ public InternalHiveSplit(
this.start = start;
this.end = end;
this.fileSize = fileSize;
this.fileModifiedTime = fileModifiedTime;
this.readBucketNumber = readBucketNumber.orElse(-1);
this.tableBucketNumber = tableBucketNumber.orElse(-1);
this.splittable = splittable;
Expand Down Expand Up @@ -146,6 +149,11 @@ public long getFileSize()
return fileSize;
}

public long getFileModifiedTime()
{
return fileModifiedTime;
}

public boolean isS3SelectPushdownEnabled()
{
return s3SelectPushdownEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private Optional<InternalHiveSplit> createInternalHiveSplit(HiveFileInfo fileInf
0,
fileInfo.getLength(),
fileInfo.getLength(),
fileInfo.getFileModifiedTime(),
readBucketNumber,
tableBucketNumber,
splittable,
Expand All @@ -123,6 +124,7 @@ public Optional<InternalHiveSplit> createInternalHiveSplit(FileSplit split)
split.getStart(),
split.getLength(),
file.getLen(),
file.getModificationTime(),
OptionalInt.empty(),
OptionalInt.empty(),
false,
Expand All @@ -136,6 +138,7 @@ private Optional<InternalHiveSplit> createInternalHiveSplit(
long start,
long length,
long fileSize,
long fileModificationTime,
OptionalInt readBucketNumber,
OptionalInt tableBucketNumber,
boolean splittable,
Expand Down Expand Up @@ -195,6 +198,7 @@ private Optional<InternalHiveSplit> createInternalHiveSplit(
start,
start + length,
fileSize,
fileModificationTime,
blocks,
readBucketNumber,
tableBucketNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -130,6 +131,7 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle
0,
outputFile.length(),
outputFile.length(),
Instant.now().toEpochMilli(),
new Storage(
StorageFormat.create(config.getHiveStorageFormat().getSerDe(), config.getHiveStorageFormat().getInputFormat(), config.getHiveStorageFormat().getOutputFormat()),
"location",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -913,6 +914,7 @@ private void testCursorProvider(HiveRecordCursorProvider cursorProvider,
split.getStart(),
split.getLength(),
split.getLength(),
Instant.now().toEpochMilli(),
new Storage(
StorageFormat.create(storageFormat.getSerDe(), storageFormat.getInputFormat(), storageFormat.getOutputFormat()),
"location",
Expand Down Expand Up @@ -977,6 +979,7 @@ private void testPageSourceFactory(HiveBatchPageSourceFactory sourceFactory,
split.getStart(),
split.getLength(),
split.getLength(),
Instant.now().toEpochMilli(),
new Storage(
StorageFormat.create(storageFormat.getSerDe(), storageFormat.getInputFormat(), storageFormat.getOutputFormat()),
"location",
Expand Down
Loading

0 comments on commit dfd2f24

Please sign in to comment.