From d6e18d80979cf73eb942ad4d9b9d4feea15475e2 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Tue, 10 Dec 2024 22:34:26 +0800 Subject: [PATCH] feature: support the Operation Cancellable. Signed-off-by: Yang Yu --- pom.xml | 2 +- .../org/apache/hadoop/fs/CosFileSystem.java | 17 +++++++++ .../org/apache/hadoop/fs/CosNFileSystem.java | 35 ++++++++++++++++++- .../OperationCancellingStatusProvider.java | 5 +++ 4 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/OperationCancellingStatusProvider.java diff --git a/pom.xml b/pom.xml index af22a66..248b5e6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.qcloud.cos hadoop-cos - 8.3.12 + 8.3.14 jar Apache Hadoop Tencent Cloud COS Support diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java index d47e657..fb81dfc 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java @@ -6,6 +6,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.cosn.Constants; +import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -717,4 +718,20 @@ public void close() throws IOException { } } } + + public void setOperationCancellingStatusProvider(OperationCancellingStatusProvider operationCancellingStatusProvider) { + if (this.actualImplFS instanceof CosNFileSystem) { + ((CosNFileSystem) this.actualImplFS).setOperationCancellingStatusProvider(operationCancellingStatusProvider); + } else { + throw new UnsupportedOperationException("Not supported currently"); + } + } + + public void removeOperationCancelingStatusProvider() { + if (this.actualImplFS instanceof CosNFileSystem) { + ((CosNFileSystem) this.actualImplFS).removeOperationCancelingStatusProvider(); + } else { + throw new UnsupportedOperationException("Not supported currently"); + } + } } diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java index f27b03c..c0cc062 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java @@ -10,6 +10,7 @@ import org.apache.hadoop.fs.cosn.CRC32CCheckSum; import org.apache.hadoop.fs.cosn.CRC64Checksum; import org.apache.hadoop.fs.cosn.LocalRandomAccessMappedBufferPool; +import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider; import org.apache.hadoop.fs.cosn.ReadBufferHolder; import org.apache.hadoop.fs.cosn.Unit; import org.apache.hadoop.fs.permission.FsPermission; @@ -48,6 +49,8 @@ public class CosNFileSystem extends FileSystem { private static final Logger LOG = LoggerFactory.getLogger(CosNFileSystem.class); + private ThreadLocal operationCancellingStatusProviderThreadLocal = new ThreadLocal<>(); + static final String SCHEME = "cosn"; static final String PATH_DELIMITER = Path.SEPARATOR; static final Charset METADATA_ENCODING = StandardCharsets.UTF_8; @@ -584,6 +587,13 @@ private void internalRecursiveDelete(String key, int listMaxLength) throws IOExc } priorLastKey = listing.getPriorLastKey(); + + if (this.operationCancellingStatusProviderThreadLocal.get() != null + && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) { + LOG.warn("The delete operation is cancelled. key: {}.", key); + throw new IOException("The delete operation is cancelled. key: " + key); + } + } while (priorLastKey != null && !Thread.currentThread().isInterrupted()); deleteFileContext.lock(); @@ -790,7 +800,13 @@ public FileStatus[] listStatus(Path f) throws IOException { status.add(directory); } priorLastKey = listing.getPriorLastKey(); - } while (priorLastKey != null); + + if (this.operationCancellingStatusProviderThreadLocal.get() != null + && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) { + LOG.warn("The list operation is cancelled. key: {}.", key); + throw new IOException("The list operation is cancelled. key: " + key); + } + } while (priorLastKey != null && !Thread.currentThread().isInterrupted()); return status.toArray(new FileStatus[status.size()]); } @@ -963,6 +979,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { @Override public boolean rename(Path src, Path dst) throws IOException { + // Renaming the root directory is not allowed if (src.isRoot()) { LOG.debug("Cannot rename the root directory of a filesystem."); @@ -1131,6 +1148,13 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { } priorLastKey = objectList.getPriorLastKey(); + if (this.operationCancellingStatusProviderThreadLocal.get() != null + && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) { + LOG.warn("The copy operation is cancelled. Stop copying the directory. srcKey: {}, dstKey: {}", + srcKey, dstKey); + throw new IOException(String.format("The copy operation is cancelled. srcKey: %s, dstKey: %s", + srcKey, dstKey)); + } } while (null != priorLastKey && !Thread.currentThread().isInterrupted()); copyFileContext.lock(); @@ -1655,4 +1679,13 @@ public String pathToKey(Path path) { public Path keyToPath(String key) { return CosNUtils.keyToPath(key, PATH_DELIMITER); } + + public void setOperationCancellingStatusProvider(OperationCancellingStatusProvider operationCancellingStatusProvider) { + this.operationCancellingStatusProviderThreadLocal.set(operationCancellingStatusProvider); + } + + // 如果设置了 OperationCancellingStatusProvider,需要记得调用这个方法做 remove 处理,防止上层线程池中内存泄漏的情况。 + public void removeOperationCancelingStatusProvider() { + this.operationCancellingStatusProviderThreadLocal.remove(); + } } diff --git a/src/main/java/org/apache/hadoop/fs/cosn/OperationCancellingStatusProvider.java b/src/main/java/org/apache/hadoop/fs/cosn/OperationCancellingStatusProvider.java new file mode 100644 index 0000000..d6285c0 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/OperationCancellingStatusProvider.java @@ -0,0 +1,5 @@ +package org.apache.hadoop.fs.cosn; + +public interface OperationCancellingStatusProvider { + boolean isCancelled(); +}