Skip to content

Commit

Permalink
[feature](hive)support insert overwrite (#32610)
Browse files Browse the repository at this point in the history
support insert overwrite for unpartitioned table and partitioned table.

issue: #31442
  • Loading branch information
wuwenchi authored Mar 22, 2024
1 parent df56ab3 commit 4078ceb
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class HMSCommitter {
private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>();
// when aborted, we need restore directory
private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new ArrayList<>();
// when finished, we need clear some directories
private final List<String> clearDirsForFinish = new ArrayList<>();
Executor fileSystemExecutor = Executors.newFixedThreadPool(16);

public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table table) {
Expand All @@ -105,6 +107,8 @@ public void commit(List<THivePartitionUpdate> hivePUs) {
t.addSuppressed(new Exception("Failed to roll back after commit failure", e));
}
throw t;
} finally {
runClearPathsForFinish();
}
}

Expand Down Expand Up @@ -250,7 +254,38 @@ public void prepareAppendTable(THivePartitionUpdate pu, HivePartitionStatistics
}

public void prepareOverwriteTable(THivePartitionUpdate pu, HivePartitionStatistics ps) {

String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
Path path = new Path(targetPath);
String oldTablePath = new Path(path.getParent(), "_temp_" + path.getName()).toString();
Status status = fs.renameDir(
targetPath,
oldTablePath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg());
}
clearDirsForFinish.add(oldTablePath);

status = fs.renameDir(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.empty(),
ps,
false
));
}

public void prepareCreateNewPartition(THivePartitionUpdate pu, HivePartitionStatistics ps) {
Expand Down Expand Up @@ -335,7 +370,38 @@ public void prepareInsertExistPartition(List<Pair<THivePartitionUpdate, HivePart


public void prepareOverwritePartition(THivePartitionUpdate pu, HivePartitionStatistics ps) {
String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
Path path = new Path(targetPath);
String oldPartitionPath = new Path(path.getParent(), "_temp_" + path.getName()).toString();
Status status = fs.renameDir(
targetPath,
oldPartitionPath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldPartitionPath + ":" + status.getErrMsg());
}
clearDirsForFinish.add(oldPartitionPath);

status = fs.renameDir(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.of(pu.getName()),
ps,
false
));
}


Expand Down Expand Up @@ -481,7 +547,7 @@ public void run(HiveMetadataOps hiveOps) {
createdPartitionValues.add(partition.getPartition().getPartitionValues());
}
} catch (Throwable t) {
LOG.error("Failed to add partition", t);
LOG.warn("Failed to add partition", t);
throw t;
}
}
Expand Down Expand Up @@ -583,18 +649,38 @@ public String toString() {
}

private void runRenameDirTasksForAbort() {
// TODO abort
Status status;
for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
status = fs.exists(task.getRenameFrom());
if (status.ok()) {
status = fs.renameDir(task.getRenameFrom(), task.getRenameTo(), () -> {});
if (!status.ok()) {
LOG.warn("Failed to abort rename dir from {} to {}:{}",
task.getRenameFrom(), task.getRenameTo(), status.getErrMsg());
}
}
}
}

private void runClearPathsForFinish() {
Status status;
for (String path : clearDirsForFinish) {
status = fs.delete(path);
if (!status.ok()) {
LOG.warn("Failed to recursively delete path {}:{}", path, status.getErrCode());
}
}
}


private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir);

if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
LOG.error("Failed to delete directory {}. Some eligible items can't be deleted: {}.",
LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.",
directory.toString(), deleteResult.getNotDeletedEligibleItems());
} else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
LOG.error("Failed to delete directory {} due to dir isn't empty", directory.toString());
LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString());
}
}

Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public interface FileSystem {

Status rename(String origFilePath, String destFilePath);

default Status renameDir(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
throw new UnsupportedOperationException("Unsupported operation rename dir on current file system.");
}

default void asyncRename(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,28 @@ public void asyncRename(
}
}

public Status renameDir(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
Status status = exists(destFilePath);
if (status.ok()) {
throw new RuntimeException("Destination directory already exists: " + destFilePath);
}

String targetParent = new Path(destFilePath).getParent().toString();
status = exists(targetParent);
if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
status = makeDir(targetParent);
}
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}

runWhenPathNotExist.run();

return rename(origFilePath, destFilePath);
}

@Override
public void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
Expand All @@ -423,23 +445,7 @@ public void asyncRenameDir(Executor executor,
if (cancelled.get()) {
return;
}

Status status = exists(destFilePath);
if (status.ok()) {
throw new RuntimeException("Destination directory already exists: " + destFilePath);
}

String targetParent = new Path(destFilePath).getParent().toString();
status = exists(targetParent);
if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
makeDir(targetParent);
} else if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}

runWhenPathNotExist.run();

status = rename(origFilePath, destFilePath);
Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist);
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,27 @@ public void testAppendPartitionForUnPartitionedTable() {
pus.add(createRandomAppend(""));
hmsOps.commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
Assert.assertEquals(3, Long.parseLong(table.getParameters().get("numRows")));
assertNumRows(3, table);

List<THivePartitionUpdate> pus2 = new ArrayList<>();
pus2.add(createRandomAppend(""));
pus2.add(createRandomAppend(""));
pus2.add(createRandomAppend(""));
hmsOps.commit(dbName, tbWithoutPartition, pus2);
table = hmsClient.getTable(dbName, tbWithoutPartition);
Assert.assertEquals(6, Long.parseLong(table.getParameters().get("numRows")));
assertNumRows(6, table);
}

@Test
public void testOverwritePartitionForUnPartitionedTable() {
// TODO
testAppendPartitionForUnPartitionedTable();
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
hmsOps.commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
}

@Test
Expand All @@ -161,11 +168,11 @@ public void testNewPartitionForPartitionedTable() {
hmsOps.commit(dbName, tbWithPartition, pus);

Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
Assert.assertEquals(3, Long.parseLong(pa.getParameters().get("numRows")));
assertNumRows(3, pa);
Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b"));
Assert.assertEquals(2, Long.parseLong(pb.getParameters().get("numRows")));
assertNumRows(2, pb);
Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c"));
Assert.assertEquals(1, Long.parseLong(pc.getParameters().get("numRows")));
assertNumRows(1, pc);
}

@Test
Expand All @@ -182,11 +189,28 @@ public void testAppendPartitionForPartitionedTable() {
hmsOps.commit(dbName, tbWithPartition, pus);

Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
Assert.assertEquals(6, Long.parseLong(pa.getParameters().get("numRows")));
assertNumRows(6, pa);
Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b"));
assertNumRows(4, pb);
Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c"));
assertNumRows(2, pc);
}

@Test
public void testOverwritePartitionForPartitionedTable() {
testAppendPartitionForPartitionedTable();
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomOverwrite("a"));
pus.add(createRandomOverwrite("b"));
pus.add(createRandomOverwrite("c"));
hmsOps.commit(dbName, tbWithPartition, pus);

Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
assertNumRows(1, pa);
Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b"));
Assert.assertEquals(4, Long.parseLong(pb.getParameters().get("numRows")));
assertNumRows(1, pb);
Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c"));
Assert.assertEquals(2, Long.parseLong(pc.getParameters().get("numRows")));
assertNumRows(1, pc);
}

@Test
Expand All @@ -200,7 +224,7 @@ public void testNewManyPartitionForPartitionedTable() {
hmsOps.commit(dbName, tbWithPartition, pus);
for (int i = 0; i < nums; i++) {
Partition p = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("" + i));
Assert.assertEquals(1, Long.parseLong(p.getParameters().get("numRows")));
assertNumRows(1, p);
}

try {
Expand All @@ -210,6 +234,14 @@ public void testNewManyPartitionForPartitionedTable() {
}
}

public void assertNumRows(long expected, Partition p) {
Assert.assertEquals(expected, Long.parseLong(p.getParameters().get("numRows")));
}

public void assertNumRows(long expected, Table t) {
Assert.assertEquals(expected, Long.parseLong(t.getParameters().get("numRows")));
}

public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) {

String uuid = UUID.randomUUID().toString();
Expand Down Expand Up @@ -241,4 +273,8 @@ public THivePartitionUpdate createRandomNew(String partition) {
public THivePartitionUpdate createRandomAppend(String partition) {
return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
}

public THivePartitionUpdate createRandomOverwrite(String partition) {
return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
}
}

0 comments on commit 4078ceb

Please sign in to comment.