diff --git a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java index 9b75d99429d3d..0665c930867fe 100644 --- a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java +++ b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; -import com.google.inject.name.Named; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.prestosql.plugin.hive.metastore.thrift.ThriftHiveMetastoreClient; @@ -26,7 +25,6 @@ import io.prestosql.tests.hive.util.TemporaryHiveTable; import net.jodah.failsafe.Failsafe; import net.jodah.failsafe.RetryPolicy; -import org.apache.thrift.TException; import org.testng.SkipException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -54,6 +52,7 @@ import static io.prestosql.tests.hive.TestHiveTransactionalTable.CompactionMode.MINOR; import static io.prestosql.tests.hive.TransactionalTableType.ACID; import static io.prestosql.tests.hive.TransactionalTableType.INSERT_ONLY; +import static io.prestosql.tests.hive.util.TableLocationUtils.getTablePath; import static io.prestosql.tests.hive.util.TemporaryHiveTable.randomTableSuffix; import static io.prestosql.tests.utils.QueryExecutors.onHive; import static java.lang.String.format; @@ -71,10 +70,6 @@ public class TestHiveTransactionalTable @Inject private TestHiveMetastoreClientFactory testHiveMetastoreClientFactory; - @Inject - @Named("databases.hive.warehouse_directory_path") - private String warehouseDirectory; - @Inject private HdfsClient hdfsClient; @@ -378,7 +373,7 @@ public void testCreateAcidTable(boolean isPartitioned, BucketingType bucketingTy @Test(groups = HIVE_TRANSACTIONAL) public void testFilesForAbortedTransactionsIgnored() - throws TException + throws Exception { if (getHiveVersionMajor() < 3) { throw new SkipException("Hive transactional tables are supported with Hive version 3 or above"); @@ -399,6 +394,8 @@ public void testFilesForAbortedTransactionsIgnored() QueryResult onePartitionQueryResult = query(selectFromOnePartitionsSql); assertThat(onePartitionQueryResult).containsExactly(row(1), row(2)); + String tableLocation = getTablePath(tableName, 1); // pass 1 to trim /delta_... suffix + // Insert data to create a valid delta, which creates `delta-B` onHive().executeQuery("INSERT INTO TABLE " + tableName + " SELECT 3"); @@ -407,26 +404,23 @@ public void testFilesForAbortedTransactionsIgnored() client.allocateTableWriteIds("default", tableName, Collections.singletonList(transaction)).get(0).getWriteId(); client.abortTransaction(transaction); - String deltaA = warehouseDirectory + "/" + tableName + "/delta_0000001_0000001_0000/bucket_00000"; - String deltaB = warehouseDirectory + "/" + tableName + "/delta_0000002_0000002_0000/bucket_00000"; - String deltaC = warehouseDirectory + "/" + tableName + "/delta_0000003_0000003_0000/bucket_00000"; + String deltaA = tableLocation + "/delta_0000001_0000001_0000"; + String deltaB = tableLocation + "/delta_0000002_0000002_0000"; + String deltaC = tableLocation + "/delta_0000003_0000003_0000"; // Delete original `delta-B`, `delta-C` - hdfsClient.delete(deltaB); - hdfsClient.delete(deltaC); - - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - hdfsClient.loadFile(deltaA, byteArrayOutputStream); + hdfsDeleteAll(deltaB); + hdfsDeleteAll(deltaC); // Copy content of `delta-A` to `delta-B` - hdfsClient.saveFile(deltaB, new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + hdfsCopyAll(deltaA, deltaB); // Verify that data from delta-A and delta-B is visible onePartitionQueryResult = query(selectFromOnePartitionsSql); assertThat(onePartitionQueryResult).containsOnly(row(1), row(1), row(2), row(2)); // Copy content of `delta-A` to `delta-C` (which is an aborted transaction) - hdfsClient.saveFile(deltaC, new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + hdfsCopyAll(deltaA, deltaC); // Verify that delta, corresponding to aborted transaction, is not getting read onePartitionQueryResult = query(selectFromOnePartitionsSql); @@ -438,6 +432,28 @@ public void testFilesForAbortedTransactionsIgnored() } } + private void hdfsDeleteAll(String directory) + { + if (!hdfsClient.exist(directory)) { + return; + } + for (String file : hdfsClient.listDirectory(directory)) { + hdfsClient.delete(directory + "/" + file); + } + } + + private void hdfsCopyAll(String source, String target) + { + if (!hdfsClient.exist(target)) { + hdfsClient.createDirectory(target); + } + for (String file : hdfsClient.listDirectory(source)) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + hdfsClient.loadFile(source + "/" + file, bos); + hdfsClient.saveFile(target + "/" + file, new ByteArrayInputStream(bos.toByteArray())); + } + } + @DataProvider public Object[][] testCreateAcidTableDataProvider() {