From 4078ceb131abfba983e7b348b5fef189bd1fd9c9 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 22 Mar 2024 21:32:20 +0800 Subject: [PATCH] [feature](hive)support insert overwrite (#32610) support insert overwrite for unpartitioned table and partitioned table. issue: #31442 --- .../doris/datasource/hive/HMSCommitter.java | 96 ++++++++++++++++++- .../java/org/apache/doris/fs/FileSystem.java | 6 ++ .../doris/fs/remote/dfs/DFSFileSystem.java | 40 ++++---- .../doris/datasource/hive/HmsCommitTest.java | 56 +++++++++-- 4 files changed, 166 insertions(+), 32 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java index 64abb985fcfd04..af26f36d6b9d39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java @@ -80,6 +80,8 @@ public class HMSCommitter { private final Queue directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>(); // when aborted, we need restore directory private final List renameDirectoryTasksForAbort = new ArrayList<>(); + // when finished, we need clear some directories + private final List clearDirsForFinish = new ArrayList<>(); Executor fileSystemExecutor = Executors.newFixedThreadPool(16); public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table table) { @@ -105,6 +107,8 @@ public void commit(List hivePUs) { t.addSuppressed(new Exception("Failed to roll back after commit failure", e)); } throw t; + } finally { + runClearPathsForFinish(); } } @@ -250,7 +254,38 @@ public void prepareAppendTable(THivePartitionUpdate pu, HivePartitionStatistics } public void prepareOverwriteTable(THivePartitionUpdate pu, HivePartitionStatistics ps) { - + String targetPath = pu.getLocation().getTargetPath(); + String writePath = pu.getLocation().getWritePath(); + if (!targetPath.equals(writePath)) { + Path path = new Path(targetPath); + String oldTablePath = new Path(path.getParent(), "_temp_" + path.getName()).toString(); + Status status = fs.renameDir( + targetPath, + oldTablePath, + () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg()); + } + clearDirsForFinish.add(oldTablePath); + + status = fs.renameDir( + writePath, + targetPath, + () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg()); + } + } + updateStatisticsTasks.add( + new UpdateStatisticsTask( + table.getDbName(), + table.getTableName(), + Optional.empty(), + ps, + false + )); } public void prepareCreateNewPartition(THivePartitionUpdate pu, HivePartitionStatistics ps) { @@ -335,7 +370,38 @@ public void prepareInsertExistPartition(List renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + targetPath + " to " + oldPartitionPath + ":" + status.getErrMsg()); + } + clearDirsForFinish.add(oldPartitionPath); + status = fs.renameDir( + writePath, + targetPath, + () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg()); + } + } + updateStatisticsTasks.add( + new UpdateStatisticsTask( + table.getDbName(), + table.getTableName(), + Optional.of(pu.getName()), + ps, + false + )); } @@ -481,7 +547,7 @@ public void run(HiveMetadataOps hiveOps) { createdPartitionValues.add(partition.getPartition().getPartitionValues()); } } catch (Throwable t) { - LOG.error("Failed to add partition", t); + LOG.warn("Failed to add partition", t); throw t; } } @@ -583,7 +649,27 @@ public String toString() { } private void runRenameDirTasksForAbort() { - // TODO abort + Status status; + for (RenameDirectoryTask task : renameDirectoryTasksForAbort) { + status = fs.exists(task.getRenameFrom()); + if (status.ok()) { + status = fs.renameDir(task.getRenameFrom(), task.getRenameTo(), () -> {}); + if (!status.ok()) { + LOG.warn("Failed to abort rename dir from {} to {}:{}", + task.getRenameFrom(), task.getRenameTo(), status.getErrMsg()); + } + } + } + } + + private void runClearPathsForFinish() { + Status status; + for (String path : clearDirsForFinish) { + status = fs.delete(path); + if (!status.ok()) { + LOG.warn("Failed to recursively delete path {}:{}", path, status.getErrCode()); + } + } } @@ -591,10 +677,10 @@ private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) { DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir); if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) { - LOG.error("Failed to delete directory {}. Some eligible items can't be deleted: {}.", + LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.", directory.toString(), deleteResult.getNotDeletedEligibleItems()); } else if (deleteEmptyDir && !deleteResult.dirNotExists()) { - LOG.error("Failed to delete directory {} due to dir isn't empty", directory.toString()); + LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java index 798f93a61c090b..0470d8b3714760 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -49,6 +49,12 @@ public interface FileSystem { Status rename(String origFilePath, String destFilePath); + default Status renameDir(String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + throw new UnsupportedOperationException("Unsupported operation rename dir on current file system."); + } + default void asyncRename(Executor executor, List> renameFileFutures, AtomicBoolean cancelled, diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index e8c645f3c9b13d..26008adf38c4f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -412,6 +412,28 @@ public void asyncRename( } } + public Status renameDir(String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + Status status = exists(destFilePath); + if (status.ok()) { + throw new RuntimeException("Destination directory already exists: " + destFilePath); + } + + String targetParent = new Path(destFilePath).getParent().toString(); + status = exists(targetParent); + if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) { + status = makeDir(targetParent); + } + if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + + runWhenPathNotExist.run(); + + return rename(origFilePath, destFilePath); + } + @Override public void asyncRenameDir(Executor executor, List> renameFileFutures, @@ -423,23 +445,7 @@ public void asyncRenameDir(Executor executor, if (cancelled.get()) { return; } - - Status status = exists(destFilePath); - if (status.ok()) { - throw new RuntimeException("Destination directory already exists: " + destFilePath); - } - - String targetParent = new Path(destFilePath).getParent().toString(); - status = exists(targetParent); - if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) { - makeDir(targetParent); - } else if (!status.ok()) { - throw new RuntimeException(status.getErrMsg()); - } - - runWhenPathNotExist.run(); - - status = rename(origFilePath, destFilePath); + Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist); if (!status.ok()) { throw new RuntimeException(status.getErrMsg()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index e5392fb11a8d8b..3098d65e952373 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -133,7 +133,7 @@ public void testAppendPartitionForUnPartitionedTable() { pus.add(createRandomAppend("")); hmsOps.commit(dbName, tbWithoutPartition, pus); Table table = hmsClient.getTable(dbName, tbWithoutPartition); - Assert.assertEquals(3, Long.parseLong(table.getParameters().get("numRows"))); + assertNumRows(3, table); List pus2 = new ArrayList<>(); pus2.add(createRandomAppend("")); @@ -141,12 +141,19 @@ public void testAppendPartitionForUnPartitionedTable() { pus2.add(createRandomAppend("")); hmsOps.commit(dbName, tbWithoutPartition, pus2); table = hmsClient.getTable(dbName, tbWithoutPartition); - Assert.assertEquals(6, Long.parseLong(table.getParameters().get("numRows"))); + assertNumRows(6, table); } @Test public void testOverwritePartitionForUnPartitionedTable() { - // TODO + testAppendPartitionForUnPartitionedTable(); + List pus = new ArrayList<>(); + pus.add(createRandomOverwrite("")); + pus.add(createRandomOverwrite("")); + pus.add(createRandomOverwrite("")); + hmsOps.commit(dbName, tbWithoutPartition, pus); + Table table = hmsClient.getTable(dbName, tbWithoutPartition); + assertNumRows(3, table); } @Test @@ -161,11 +168,11 @@ public void testNewPartitionForPartitionedTable() { hmsOps.commit(dbName, tbWithPartition, pus); Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); - Assert.assertEquals(3, Long.parseLong(pa.getParameters().get("numRows"))); + assertNumRows(3, pa); Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b")); - Assert.assertEquals(2, Long.parseLong(pb.getParameters().get("numRows"))); + assertNumRows(2, pb); Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c")); - Assert.assertEquals(1, Long.parseLong(pc.getParameters().get("numRows"))); + assertNumRows(1, pc); } @Test @@ -182,11 +189,28 @@ public void testAppendPartitionForPartitionedTable() { hmsOps.commit(dbName, tbWithPartition, pus); Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); - Assert.assertEquals(6, Long.parseLong(pa.getParameters().get("numRows"))); + assertNumRows(6, pa); + Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b")); + assertNumRows(4, pb); + Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c")); + assertNumRows(2, pc); + } + + @Test + public void testOverwritePartitionForPartitionedTable() { + testAppendPartitionForPartitionedTable(); + List pus = new ArrayList<>(); + pus.add(createRandomOverwrite("a")); + pus.add(createRandomOverwrite("b")); + pus.add(createRandomOverwrite("c")); + hmsOps.commit(dbName, tbWithPartition, pus); + + Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); + assertNumRows(1, pa); Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b")); - Assert.assertEquals(4, Long.parseLong(pb.getParameters().get("numRows"))); + assertNumRows(1, pb); Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c")); - Assert.assertEquals(2, Long.parseLong(pc.getParameters().get("numRows"))); + assertNumRows(1, pc); } @Test @@ -200,7 +224,7 @@ public void testNewManyPartitionForPartitionedTable() { hmsOps.commit(dbName, tbWithPartition, pus); for (int i = 0; i < nums; i++) { Partition p = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("" + i)); - Assert.assertEquals(1, Long.parseLong(p.getParameters().get("numRows"))); + assertNumRows(1, p); } try { @@ -210,6 +234,14 @@ public void testNewManyPartitionForPartitionedTable() { } } + public void assertNumRows(long expected, Partition p) { + Assert.assertEquals(expected, Long.parseLong(p.getParameters().get("numRows"))); + } + + public void assertNumRows(long expected, Table t) { + Assert.assertEquals(expected, Long.parseLong(t.getParameters().get("numRows"))); + } + public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) { String uuid = UUID.randomUUID().toString(); @@ -241,4 +273,8 @@ public THivePartitionUpdate createRandomNew(String partition) { public THivePartitionUpdate createRandomAppend(String partition) { return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND); } + + public THivePartitionUpdate createRandomOverwrite(String partition) { + return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE); + } }