diff --git a/pom.xml b/pom.xml
index af22a66..248b5e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.qcloud.cos
hadoop-cos
- 8.3.12
+ 8.3.14
jar
Apache Hadoop Tencent Cloud COS Support
diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java
index d47e657..fb81dfc 100644
--- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java
+++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java
@@ -6,6 +6,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.cosn.Constants;
+import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -717,4 +718,20 @@ public void close() throws IOException {
}
}
}
+
+ public void setOperationCancellingStatusProvider(OperationCancellingStatusProvider operationCancellingStatusProvider) {
+ if (this.actualImplFS instanceof CosNFileSystem) {
+ ((CosNFileSystem) this.actualImplFS).setOperationCancellingStatusProvider(operationCancellingStatusProvider);
+ } else {
+ throw new UnsupportedOperationException("Not supported currently");
+ }
+ }
+
+ public void removeOperationCancelingStatusProvider() {
+ if (this.actualImplFS instanceof CosNFileSystem) {
+ ((CosNFileSystem) this.actualImplFS).removeOperationCancelingStatusProvider();
+ } else {
+ throw new UnsupportedOperationException("Not supported currently");
+ }
+ }
}
diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
index f27b03c..c0cc062 100644
--- a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
+++ b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
@@ -10,6 +10,7 @@
import org.apache.hadoop.fs.cosn.CRC32CCheckSum;
import org.apache.hadoop.fs.cosn.CRC64Checksum;
import org.apache.hadoop.fs.cosn.LocalRandomAccessMappedBufferPool;
+import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider;
import org.apache.hadoop.fs.cosn.ReadBufferHolder;
import org.apache.hadoop.fs.cosn.Unit;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -48,6 +49,8 @@
public class CosNFileSystem extends FileSystem {
private static final Logger LOG = LoggerFactory.getLogger(CosNFileSystem.class);
+ private ThreadLocal operationCancellingStatusProviderThreadLocal = new ThreadLocal<>();
+
static final String SCHEME = "cosn";
static final String PATH_DELIMITER = Path.SEPARATOR;
static final Charset METADATA_ENCODING = StandardCharsets.UTF_8;
@@ -584,6 +587,13 @@ private void internalRecursiveDelete(String key, int listMaxLength) throws IOExc
}
priorLastKey = listing.getPriorLastKey();
+
+ if (this.operationCancellingStatusProviderThreadLocal.get() != null
+ && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
+ LOG.warn("The delete operation is cancelled. key: {}.", key);
+ throw new IOException("The delete operation is cancelled. key: " + key);
+ }
+
} while (priorLastKey != null && !Thread.currentThread().isInterrupted());
deleteFileContext.lock();
@@ -790,7 +800,13 @@ public FileStatus[] listStatus(Path f) throws IOException {
status.add(directory);
}
priorLastKey = listing.getPriorLastKey();
- } while (priorLastKey != null);
+
+ if (this.operationCancellingStatusProviderThreadLocal.get() != null
+ && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
+ LOG.warn("The list operation is cancelled. key: {}.", key);
+ throw new IOException("The list operation is cancelled. key: " + key);
+ }
+ } while (priorLastKey != null && !Thread.currentThread().isInterrupted());
return status.toArray(new FileStatus[status.size()]);
}
@@ -963,6 +979,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
@Override
public boolean rename(Path src, Path dst) throws IOException {
+
// Renaming the root directory is not allowed
if (src.isRoot()) {
LOG.debug("Cannot rename the root directory of a filesystem.");
@@ -1131,6 +1148,13 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
}
priorLastKey = objectList.getPriorLastKey();
+ if (this.operationCancellingStatusProviderThreadLocal.get() != null
+ && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
+ LOG.warn("The copy operation is cancelled. Stop copying the directory. srcKey: {}, dstKey: {}",
+ srcKey, dstKey);
+ throw new IOException(String.format("The copy operation is cancelled. srcKey: %s, dstKey: %s",
+ srcKey, dstKey));
+ }
} while (null != priorLastKey && !Thread.currentThread().isInterrupted());
copyFileContext.lock();
@@ -1655,4 +1679,13 @@ public String pathToKey(Path path) {
public Path keyToPath(String key) {
return CosNUtils.keyToPath(key, PATH_DELIMITER);
}
+
+ public void setOperationCancellingStatusProvider(OperationCancellingStatusProvider operationCancellingStatusProvider) {
+ this.operationCancellingStatusProviderThreadLocal.set(operationCancellingStatusProvider);
+ }
+
+ // 如果设置了 OperationCancellingStatusProvider,需要记得调用这个方法做 remove 处理,防止上层线程池中内存泄漏的情况。
+ public void removeOperationCancelingStatusProvider() {
+ this.operationCancellingStatusProviderThreadLocal.remove();
+ }
}
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/OperationCancellingStatusProvider.java b/src/main/java/org/apache/hadoop/fs/cosn/OperationCancellingStatusProvider.java
new file mode 100644
index 0000000..d6285c0
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/fs/cosn/OperationCancellingStatusProvider.java
@@ -0,0 +1,5 @@
+package org.apache.hadoop.fs.cosn;
+
+public interface OperationCancellingStatusProvider {
+ boolean isCancelled();
+}