Skip to content

Commit

Permalink
Cleanups in testFilesForAbortedTransactionsIgnored
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Oct 7, 2020
1 parent 3a38606 commit 5de8628
Showing 1 changed file with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.prestosql.plugin.hive.metastore.thrift.ThriftHiveMetastoreClient;
Expand All @@ -26,7 +25,6 @@
import io.prestosql.tests.hive.util.TemporaryHiveTable;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.apache.thrift.TException;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -54,6 +52,7 @@
import static io.prestosql.tests.hive.TestHiveTransactionalTable.CompactionMode.MINOR;
import static io.prestosql.tests.hive.TransactionalTableType.ACID;
import static io.prestosql.tests.hive.TransactionalTableType.INSERT_ONLY;
import static io.prestosql.tests.hive.util.TableLocationUtils.getTablePath;
import static io.prestosql.tests.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.prestosql.tests.utils.QueryExecutors.onHive;
import static java.lang.String.format;
Expand All @@ -71,10 +70,6 @@ public class TestHiveTransactionalTable
@Inject
private TestHiveMetastoreClientFactory testHiveMetastoreClientFactory;

@Inject
@Named("databases.hive.warehouse_directory_path")
private String warehouseDirectory;

@Inject
private HdfsClient hdfsClient;

Expand Down Expand Up @@ -378,7 +373,7 @@ public void testCreateAcidTable(boolean isPartitioned, BucketingType bucketingTy

@Test(groups = HIVE_TRANSACTIONAL)
public void testFilesForAbortedTransactionsIgnored()
throws TException
throws Exception
{
if (getHiveVersionMajor() < 3) {
throw new SkipException("Hive transactional tables are supported with Hive version 3 or above");
Expand All @@ -399,6 +394,8 @@ public void testFilesForAbortedTransactionsIgnored()
QueryResult onePartitionQueryResult = query(selectFromOnePartitionsSql);
assertThat(onePartitionQueryResult).containsExactly(row(1), row(2));

String tableLocation = getTablePath(tableName, 1); // pass 1 to trim /delta_... suffix

// Insert data to create a valid delta, which creates `delta-B`
onHive().executeQuery("INSERT INTO TABLE " + tableName + " SELECT 3");

Expand All @@ -407,26 +404,23 @@ public void testFilesForAbortedTransactionsIgnored()
client.allocateTableWriteIds("default", tableName, Collections.singletonList(transaction)).get(0).getWriteId();
client.abortTransaction(transaction);

String deltaA = warehouseDirectory + "/" + tableName + "/delta_0000001_0000001_0000/bucket_00000";
String deltaB = warehouseDirectory + "/" + tableName + "/delta_0000002_0000002_0000/bucket_00000";
String deltaC = warehouseDirectory + "/" + tableName + "/delta_0000003_0000003_0000/bucket_00000";
String deltaA = tableLocation + "/delta_0000001_0000001_0000";
String deltaB = tableLocation + "/delta_0000002_0000002_0000";
String deltaC = tableLocation + "/delta_0000003_0000003_0000";

// Delete original `delta-B`, `delta-C`
hdfsClient.delete(deltaB);
hdfsClient.delete(deltaC);

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
hdfsClient.loadFile(deltaA, byteArrayOutputStream);
hdfsDeleteAll(deltaB);
hdfsDeleteAll(deltaC);

// Copy content of `delta-A` to `delta-B`
hdfsClient.saveFile(deltaB, new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
hdfsCopyAll(deltaA, deltaB);

// Verify that data from delta-A and delta-B is visible
onePartitionQueryResult = query(selectFromOnePartitionsSql);
assertThat(onePartitionQueryResult).containsOnly(row(1), row(1), row(2), row(2));

// Copy content of `delta-A` to `delta-C` (which is an aborted transaction)
hdfsClient.saveFile(deltaC, new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
hdfsCopyAll(deltaA, deltaC);

// Verify that delta, corresponding to aborted transaction, is not getting read
onePartitionQueryResult = query(selectFromOnePartitionsSql);
Expand All @@ -438,6 +432,28 @@ public void testFilesForAbortedTransactionsIgnored()
}
}

private void hdfsDeleteAll(String directory)
{
if (!hdfsClient.exist(directory)) {
return;
}
for (String file : hdfsClient.listDirectory(directory)) {
hdfsClient.delete(directory + "/" + file);
}
}

private void hdfsCopyAll(String source, String target)
{
if (!hdfsClient.exist(target)) {
hdfsClient.createDirectory(target);
}
for (String file : hdfsClient.listDirectory(source)) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
hdfsClient.loadFile(source + "/" + file, bos);
hdfsClient.saveFile(target + "/" + file, new ByteArrayInputStream(bos.toByteArray()));
}
}

@DataProvider
public Object[][] testCreateAcidTableDataProvider()
{
Expand Down

0 comments on commit 5de8628

Please sign in to comment.