Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read support for original files of ACID/Transactional tables #2930

Closed

Conversation

harmandeeps
Copy link
Member

@harmandeeps harmandeeps commented Feb 24, 2020

Fixes #2293.

Overview:

  1. Original Files are created in Non-ACID table, but after it gets converted to an ACID table, these files persist until major compaction.
  2. Original files don't have ACID columns, that's why to filter out the deleted rows (if someone deletes some rows after ACID conversion) by comparing the global row ID of a row with delete delta of the same bucket.

High-Level Approach:

  1. For an original file split, filterout out all the original files/delete deltas belong to the same bucket. Send them in acidInfo.
  2. For each original file split, calculate it's global start row ID which is the total number of rows present before the given original file in the same bucket.
  3. In OrcDeletedRows, iterate from start row ID and filter out the deleted rows.

@cla-bot cla-bot bot added the cla-signed label Feb 24, 2020
@harmandeeps harmandeeps requested a review from sopel39 March 3, 2020 10:33
@dain dain removed their request for review March 23, 2020 18:14
@dain
Copy link
Member

dain commented Mar 23, 2020

I skimmed the code. The approach seems reasonable to me, but I'll leave the call to the others.

@harmandeeps harmandeeps force-pushed the read_support_acid_original_files branch 2 times, most recently from c44c930 to 9e9d1a4 Compare June 16, 2020 05:11
Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harmandeeps
I did a first pass.

I need to yet fully understand how it works.
Is there any documentation covering delete delta with respect to original files?

private final Optional<Integer> bucketId;

@JsonCreator
public AcidInfo(@JsonProperty("deleteDeltas") Optional<DeleteDeltaLocations> deleteDeltaLocations,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per code style, put each param on separate line

Suggested change
public AcidInfo(@JsonProperty("deleteDeltas") Optional<DeleteDeltaLocations> deleteDeltaLocations,
public AcidInfo(
@JsonProperty("deleteDeltas") Optional<DeleteDeltaLocations> deleteDeltaLocations,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 86 to 101
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

AcidInfo acidInfo = (AcidInfo) o;

if (!deleteDeltaLocations.equals(acidInfo.deleteDeltaLocations)) {
return false;
}
if (!originalFileLocations.equals(acidInfo.originalFileLocations)) {
return false;
}
return bucketId.equals(acidInfo.bucketId);
}

@Override
public int hashCode()
{
int result = deleteDeltaLocations.hashCode();
result = 31 * result + originalFileLocations.hashCode();
result = 31 * result + bucketId.hashCode();
return result;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think AcidInfo needs to define equality. If so, please remove.

(Otherwise, use equals/hashCode generated by intellij)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Comment on lines 76 to 78
public boolean isSameBucket(Path path1, Path path2, Configuration conf)
throws IOException
{
if (bucketId.isPresent()) {
return bucketId.get() == AcidUtils.parseBaseOrDeltaBucketFilename(path2, conf).getBucketId();
}
return AcidUtils.parseBaseOrDeltaBucketFilename(path2, conf).getBucketId() ==
AcidUtils.parseBaseOrDeltaBucketFilename(path1, conf).getBucketId();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has some API problems, but fortunately it seems unused. Please remove.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remoed.

Comment on lines 200 to 226
public Optional<DeleteDeltaLocations> getDeleteDeltaLocations()
{
return deleteDeltaLocations;
}

public Optional<OriginalFileLocations> getOriginalFileLocations()
{
return originalFileLocations;
}

public Optional<Integer> getBucketId()
{
return bucketId;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 194 to 197
public Builder addDeleteDeltaLocations(Optional<DeleteDeltaLocations> deleteDeltaLocations)
{
this.deleteDeltaLocations = deleteDeltaLocations;
return this;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't "add", so setDeleteDeltaLocations() would be better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines +60 to +62
// Row ID relative to all the original files of the same bucket ID before this file in lexicographic order
private Optional<Long> originalFileRowId = Optional.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for commenting here.

However, i do not fully understand. The original files do not have row ids, do they?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, original files don't have row ids, but we need to calculate it to filter out the deleted rows as mentioned at line: L121.

@@ -238,7 +240,7 @@ private static OrcPageSource createOrcPageSource(
List<OrcColumn> fileReadColumns = new ArrayList<>(columns.size() + (isFullAcid ? 3 : 0));
List<Type> fileReadTypes = new ArrayList<>(columns.size() + (isFullAcid ? 3 : 0));
List<OrcReader.ProjectedLayout> fileReadLayouts = new ArrayList<>(columns.size() + (isFullAcid ? 3 : 0));
if (isFullAcid) {
if (isFullAcid && !originalFilesPresent) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should verify expected schema also when handling original files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@@ -257,7 +259,7 @@ private static OrcPageSource createOrcPageSource(
}

Map<String, OrcColumn> fileColumnsByName = ImmutableMap.of();
if (useOrcColumnNames || isFullAcid) {
if (useOrcColumnNames || (isFullAcid && !originalFilesPresent)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should verify expected schema also when handling original files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Path path = new Path(splitPath.getParent() + "/" + originalFileInfo.getName());
try {
// Check if the file belongs to the same bucket and comes before 'reqPath' in lexicographic order.
if (isSameBucket(splitPath, path, configuration) && path.compareTo(splitPath) < 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we BHSL divides information per bucket, do we need isSameBucket here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it is redundant. removed.

@@ -75,7 +75,8 @@ public OrcDeleteDeltaPageSource(
String sessionUser,
Configuration configuration,
HdfsEnvironment hdfsEnvironment,
FileFormatDataSourceStats stats)
FileFormatDataSourceStats stats,
boolean originalFilesPresent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the right name. (And I am not sure about code structure too.)

This is because, when originalFilesPresent, this changes behavior of this class, in a incompatible matter.
So, data processing when there are no original files works some way A and when you create one orignal file, this class starts to beheave in a way B, which is not compatible with A.
For example, it discards transaction id form delta files.

Delete deltas for original files are different. I don't think we should discard data. We should verify there isn't the information we would be discarding.

If my understanding is correct, I suggest you rename originalFilesPresent -> originalFiles
and when set, validate the file does not have those transactionId, etc information we would be ignoring.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete deltas structure is same for deltas, base as well as original files. For delete deltas, we don't need to read bucket ID as we filter out delete delta with the same bucket as that of original file split (OrcDeletedRows.java -> getDeletedRows). We calculate original file row ID synthetically by reading the ORC footer of all the original files present before in lexicographically order + the location in the current ORC file.

So, there is no need to read original transaction ID, bucket ID from delta files to save read time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, there is no need to read original transaction ID, bucket ID from delta files to save read time.

I do not think it matters, and it makes the code more complex.
I view the original files as something we do not need to optimize for.
I think you think the same, otherwise we would perhaps be a bit smarter
than reading footer information from each of the files over and over again.

For this reason, please remove the originalFilesPresent from here, i.e.
let's back out changes from this class.

@harmandeeps
Copy link
Member Author

@harmandeeps
I did a first pass.

I need to yet fully understand how it works.
Is there any documentation covering delete delta with respect to original files?

@findepi Thank you for the review. I will address the comments. There is no specific documentation that I found about original files. I read some Hive code, ORC structure to understand original files case. I have created a doc, please take a look: https://docs.google.com/document/d/1FeVu0kaunW3sg97Kr-be9WIWTgKKpybLo8ZvA5JBZvg/edit?usp=sharing

@harmandeeps harmandeeps requested a review from findepi June 22, 2020 06:56
@@ -50,7 +50,7 @@
private final TableToPartitionMapping tableToPartitionMapping;
private final Optional<BucketConversion> bucketConversion;
private final boolean s3SelectPushdownEnabled;
private final Optional<DeleteDeltaLocations> deleteDeltaLocations;
private final Optional<AcidInfo> acidInfo;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Would be nice to separate out a commit introducing AcidInfo class, separately from other changes.
You can do this on your own, or reuse first commit from #4049.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I will create a separate commit introducing AcidInfo class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems @wendigo took care of this in #4049. I think he picked the AcidInfo name to avoid causing conflicts with your PR, so you should just be able to rebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have rebased PR on the master branch, thank you.

@harmandeeps harmandeeps force-pushed the read_support_acid_original_files branch 2 times, most recently from ebc0c9c to a05d92b Compare June 29, 2020 13:55
@harmandeeps
Copy link
Member Author

@findepi : Thank you for the feedback. I have addressed your comments. Please have a look.

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harmandeeps thanks for your work on this.

public AcidInfo(
@JsonProperty("partitionLocation") String partitionLocation,
@JsonProperty("deleteDeltas") List<DeleteDeltaInfo> deleteDeltas)
public AcidInfo(@JsonProperty("partitionLocation") String partitionLocation,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move argument to the next line

deleteDeltas.equals(that.deleteDeltas);
deleteDeltas.equals(that.deleteDeltas) &&
originalFiles.equals(that.originalFiles) &&
bucketId == (that.bucketId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use intellij to generate equals

return bucketId == that.bucketId &&
                Objects.equals(partitionLocation, that.partitionLocation) &&
                Objects.equals(deleteDeltas, that.deleteDeltas) &&
                Objects.equals(originalFiles, that.originalFiles);

Comment on lines 182 to 185
/**
* Stores original files related information.
* To calculate the correct starting row ID of an original file, OriginalFilesUtils needs OriginalFileInfo list.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, this does not add more information than visible in class name and in OriginalFilesUtils signatures

Comment on lines 192 to 190
public OriginalFileInfo(@JsonProperty("name") String name,
@JsonProperty("fileSize") long fileSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Place all parameters on the same line, or each on separate.

@JsonProperty("fileSize") long fileSize)
{
this.name = requireNonNull(name, "name is null");
checkArgument(fileSize > 0, "fileSize should be > 0");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty files are legal ORC files. Please add a test covering that.

}
}

private void generateOriginalFilesSplits(Configuration configuration, FileSystem fs, InternalHiveSplitFactory splitFactory,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please each parameter on a separate line

private void generateOriginalFilesSplits(Configuration configuration, FileSystem fs, InternalHiveSplitFactory splitFactory,
List<HadoopShims.HdfsFileStatusWithId> originalFileLocations, Optional<AcidInfo.Builder> acidInfoBuilder)
{
if (originalFileLocations == null || originalFileLocations.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originalFileLocations cannot be null nor empty here

Comment on lines 561 to 570
// INSERT ONLY Tables case
if (!acidInfoBuilder.isPresent()) {
addOriginalFilesUtil(
configuration,
fs,
originalFileLocations,
splitFactory,
Optional.empty());
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is dead code, right?

}

private void generateOriginalFilesSplits(Configuration configuration, FileSystem fs, InternalHiveSplitFactory splitFactory,
List<HadoopShims.HdfsFileStatusWithId> originalFileLocations, Optional<AcidInfo.Builder> acidInfoBuilder)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acidInfoBuilder is always present, does not need to be Optional

@harmandeeps harmandeeps force-pushed the read_support_acid_original_files branch 2 times, most recently from c01b7d3 to 7546748 Compare July 6, 2020 18:23
Copy link
Member Author

@harmandeeps harmandeeps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi : Thank you for the review, I have addressed the comments. Please take a look.

Comment on lines 287 to 292
/* Path partitionPathFromOriginalPath = originalFilePath.getParent();
checkArgument(
partitionLocation.equals(partitionPathFromOriginalPath),
"Partition location in OriginalFile '%s' does not match stored location '%s'",
originalFilePath.getParent().toString(),
partitionLocation);*/
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The product tests were failing because of error:

2020-06-29T14:25:20.2138260Z tests               | Caused by: java.lang.IllegalArgumentException: Partition location in OriginalFile '/user/hive/warehouse/reading_full_acid_converted_table' does not match stored location 'hdfs://hadoop-master:9000/user/hive/warehouse/reading_full_acid_converted_table'
2020-06-29T14:25:20.2138423Z tests               | 	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:441)
2020-06-29T14:25:20.2138556Z tests               | 	at io.prestosql.plugin.hive.AcidInfo$Builder.addOriginalFile(AcidInfo.java:288)

@@ -193,7 +206,12 @@ private void loadValidPositions()
Page page = pageSource.getNextPage();
if (page != null) {
for (int i = 0; i < page.getPositionCount(); i++) {
deletedRowsBuilder.add(new RowId(page, i));
long originalTransaction = -1;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi : Here, we filter the delete delta on bucketId first, therefore, there is no need of reading bucketId from ACID columns. If this approach looks fine, then I can push changes to remove 'bucket' variable from RowId class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not complicate this PR. To be considered later, OK?

@harmandeeps harmandeeps force-pushed the read_support_acid_original_files branch 3 times, most recently from 97d243e to ee88d97 Compare July 8, 2020 13:52
Comment on lines 287 to 292
/* Path partitionPathFromOriginalPath = originalFilePath.getParent();
checkArgument(
partitionLocation.equals(partitionPathFromOriginalPath),
"Partition location in OriginalFile '%s' does not match stored location '%s'",
originalFilePath.getParent().toString(),
partitionLocation);*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it related to a05d92b#r448353960?

in the worst case, you can compare "path's uri's path" only
and add a comment explaining why doing so

@@ -193,7 +206,12 @@ private void loadValidPositions()
Page page = pageSource.getNextPage();
if (page != null) {
for (int i = 0; i < page.getPositionCount(); i++) {
deletedRowsBuilder.add(new RowId(page, i));
long originalTransaction = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not complicate this PR. To be considered later, OK?

@findepi
Copy link
Member

findepi commented Jul 17, 2020

@electrum please double review BackgroundHiveSplitLoader changes in the case when original files are not present.

"entry for requested bucket Id");
List<DeleteDeltaInfo> deleteDeltas = deleteDeltaInfoBuilder.build();
if (deleteDeltas.isEmpty()) {
return Optional.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With current shape of this PR Presto fails to read table data if there are original files and there are no delete_delta files present.
If (deleteDeltas.isEmpty()) resolves to true we are loosing all the original files information. Yet Orc reader, later on, expects the ACID columns to be present in data files.
This results in following exception on read:

Query 20200720_104400_00016_v7svz failed: ORC ACID file should have 6 columns: hdfs://hadoop-master:9000/user/hive/warehouse/t/000000_0
io.prestosql.spi.PrestoException: ORC ACID file should have 6 columns: hdfs://hadoop-master:9000/user/hive/warehouse/t/000000_0
        at io.prestosql.plugin.hive.orc.OrcPageSourceFactory.verifyAcidSchema(OrcPageSourceFactory.java:418)
        at io.prestosql.plugin.hive.orc.OrcPageSourceFactory.createOrcPageSource(OrcPageSourceFactory.java:244)
        at io.prestosql.plugin.hive.orc.OrcPageSourceFactory.createPageSource(OrcPageSourceFactory.java:160)
        at io.prestosql.plugin.hive.HivePageSourceProvider.createHivePageSource(HivePageSourceProvider.java:178)
        at io.prestosql.plugin.hive.HivePageSourceProvider.createPageSource(HivePageSourceProvider.java:105)
        at io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider.createPageSource(ClassLoaderSafeConnectorPageSourceProvider.java:57)
        at io.prestosql.split.PageSourceManager.createPageSource(PageSourceManager.java:64)
        at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:298)
        at io.prestosql.operator.Driver.processInternal(Driver.java:379)
        at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
        at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
        at io.prestosql.operator.Driver.processFor(Driver.java:276)
        at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
        at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
        at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
        at io.prestosql.$gen.Presto_unknown____20200720_103058_2.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

Repro steps:
Create non-transactional table on hive and add some data:

  CREATE TABLE t (a integer) stored as orc tblproperties ('transactional'='false');
  insert into t values (1);

make table transactional:

ALTER TABLE t SET TBLPROPERTIES ('transactional'='true');

At this point reads from presto fail. To make them work we currently need to delete something from table t, so delete_delta file is present.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@losipiuk : Thank you for pointing this out, will fix this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/AcidInfo.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/AcidInfo.java
index 15838b165b..14f8d8d580 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/AcidInfo.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/AcidInfo.java
@@ -48,7 +48,6 @@ public class AcidInfo
     {
         this.partitionLocation = requireNonNull(partitionLocation, "partitionLocation is null");
         this.deleteDeltas = ImmutableList.copyOf(requireNonNull(deleteDeltas, "deleteDeltas is null"));
-        checkArgument(!deleteDeltas.isEmpty(), "deleteDeltas is empty");
         this.originalFiles = ImmutableList.copyOf(requireNonNull(originalFiles, "originalFiles is null"));
         this.bucketId = bucketId;
     }
@@ -296,9 +295,6 @@ public class AcidInfo
             checkState(bucketId > -1 && bucketIdToOriginalFileInfoMap.containsKey(bucketId), "Bucket Id to OriginalFileInfo map should have " +
                     "entry for requested bucket Id");
             List<DeleteDeltaInfo> deleteDeltas = deleteDeltaInfoBuilder.build();
-            if (deleteDeltas.isEmpty()) {
-                return Optional.empty();
-            }
             return Optional.of(new AcidInfo(partitionLocation.toString(), deleteDeltas, bucketIdToOriginalFileInfoMap.get(bucketId), bucketId));
         }

seems to improve situation, yet I am not sure if it does not break some flow.

Copy link
Member Author

@harmandeeps harmandeeps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review @findepi . I have addressed the comments, please take a look.

}

if (!fileStatusOriginalFiles.isEmpty()) {
generateOriginalFilesSplits(fs, splitFactory, fileStatusOriginalFiles, acidInfoBuilder);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we return at L531 in that case, so it won't be executed.

@@ -55,6 +55,10 @@ public void testJsonRoundTrip()
AcidInfo.Builder acidInfoBuilder = AcidInfo.builder(new Path("file:///data/fullacid"));
acidInfoBuilder.addDeleteDelta(new Path("file:///data/fullacid/delete_delta_0000004_0000004_0000"), 4L, 4L, 0);
acidInfoBuilder.addDeleteDelta(new Path("file:///data/fullacid/delete_delta_0000007_0000007_0000"), 7L, 7L, 0);

acidInfoBuilder.addOriginalFile(new Path("file:///data/fullacid/000000_0"), 120, 0);
acidInfoBuilder.addOriginalFile(new Path("file:///data/fullacid/000001_0"), 125, 0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this to compare the acidInfo.

acidColumns.get(ACID_COLUMN_ORIGINAL_TRANSACTION.toLowerCase(ENGLISH)),
acidColumns.get(ACID_COLUMN_BUCKET.toLowerCase(ENGLISH)),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need <originalTransaction, bucketId, rowId> to correctly figure out if a row is deleted.

In case of original files, we don't have ACID columns. We calculate rowId as given in OriginalFilesUtils, originalTrasaction is always 0. But, we don't know the bucketId. So, for faster read and simplification, we can drop not reading bucketId at all in this PR. That's why, I have dropped this Bucket column read code. Please let me know your take on this.

acidColumns.get(ACID_COLUMN_ROW_ID.toLowerCase(ENGLISH)));

recordReader = reader.createRecordReader(
rowIdColumns,
ImmutableList.of(BIGINT, INTEGER, BIGINT),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as explained above, to avoid reading the bucket id column.

int bucket = -1;
long row;
originalTransaction = BIGINT.getLong(page.getBlock(ORIGINAL_TRANSACTION_INDEX), i);
row = BIGINT.getLong(page.getBlock(BUCKET_ID_INDEX), i);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As now, we are reading only originalTxnId, rowId. So, we should read index 0, 1. As,
BUCKET_ID_INDEX = 1, so I used it. I will change the variable value if this approach looks fine.

@@ -55,10 +55,6 @@ public void testJsonRoundTrip()
AcidInfo.Builder acidInfoBuilder = AcidInfo.builder(new Path("file:///data/fullacid"));
acidInfoBuilder.addDeleteDelta(new Path("file:///data/fullacid/delete_delta_0000004_0000004_0000"), 4L, 4L, 0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are keeping delete delta, why are we removing original file? am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i backed out a change in the file, which seemed unnecessary (test passed with and without).
however, i would be more happy to understand why we want it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the notion was to check if deserialization of HiveSplit works fine. It may help to catch the errors when some issue with Json serialization and we do have Json params in OriginalFileLocations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea. It needs to be reflected in the test assertions too. Can you please address this as a follow up PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, will take this up as a follow up PR.

@harmandeeps harmandeeps force-pushed the read_support_acid_original_files branch 2 times, most recently from c09de52 to a2f81c5 Compare July 22, 2020 20:28
@harmandeeps harmandeeps force-pushed the read_support_acid_original_files branch from a2f81c5 to ee1c88f Compare July 23, 2020 04:13
Copy link
Member Author

@harmandeeps harmandeeps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review @losipiuk . I have addressed the comments.

@@ -108,8 +114,11 @@ public Page getNextPage()
return null;
}

OptionalLong startRowId = originalFileRowId.isPresent() ?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the present one is more readable. Otherwise, it was getting messy as to convert Optional<> to OptionalInt in case if it present.

AcidInfo.Builder acidInfoBuilder = AcidInfo.builder(path);
addDeleteDelta(acidInfoBuilder, 10000001L, 10000001L, 0, path);

acidInfoBuilder.addOriginalFile(new Path(path, "000000_0"), 743, 0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@@ -188,6 +188,83 @@ public void testReadInsertOnly(boolean isPartitioned, BucketingType bucketingTyp
}
}

@Test(groups = {STORAGE_FORMATS, HIVE_TRANSACTIONAL}, dataProvider = "partitioningAndBucketingTypeDataProvider", timeOut = TEST_TIMEOUT)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

FileFormatDataSourceStats stats)
{
long rowCount = 0;
for (OriginalFileInfo originalFileInfo : originalFileInfos) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can come up with a cache size and eviction time, then we can do this too.

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.ql.io.AcidUtils.deleteDeltaSubdir;

@NotThreadSafe
public class OrcDeletedRows
{
private static final int ORIGINAL_TRANSACTION_INDEX = 0;
private static final int ROW_ID_INDEX_DELETE_DELTA = 1; // delete delta page has columns: <originalId,rowId>
private static final int ROW_ID_INDEX = 2; // base/delta page has columns: <originalId,bucketId,rowId>
Copy link
Member

@losipiuk losipiuk Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are do not care about bucket id from base/delta pages can't we just refrain from setting up PageSource from reading it.

I.e. drop this

                fileReadColumns.add(acidColumnsByName.get(ACID_COLUMN_BUCKET.toLowerCase(ENGLISH)));
                fileReadTypes.add(INTEGER);
                fileReadLayouts.add(fullyProjectedLayout());

from https://github.com/prestosql/presto/blob/c0ead8f19be0930a5ad820033de77a34dd9fe125/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java#L192 ?

Then we could use same index. Right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I have pushed the changes.

@@ -97,18 +97,6 @@ public void testReadFullAcidBucketedV2()
doTestReadFullAcid(false, BucketingType.BUCKETED_V2);
}

@Test(groups = HIVE_TRANSACTIONAL, timeOut = TEST_TIMEOUT)
Copy link
Member Author

@harmandeeps harmandeeps Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi : @losipiuk suggested me to add product tests for bucket version v1/v2.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am aware. but they were not correct

@findepi
Copy link
Member

findepi commented Jul 24, 2020

CI failed -- #4395

@findepi
Copy link
Member

findepi commented Jul 24, 2020

Merged as 62e1622, thanks!

@findepi findepi closed this Jul 24, 2020
@findepi findepi added this to the 340 milestone Jul 24, 2020
@findepi findepi mentioned this pull request Jul 24, 2020
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Support reading ACID/Transactional tables with "original files"
5 participants