Skip to content

Commit

Permalink
feature: support the Operation Cancellable.
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Yu <[email protected]>
  • Loading branch information
yuyang733 committed Dec 11, 2024
1 parent b530ddd commit be03fe4
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>8.3.12</version>
<version>8.3.14</version>
<packaging>jar</packaging>

<name>Apache Hadoop Tencent Cloud COS Support</name>
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/org/apache/hadoop/fs/CosFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.cosn.Constants;
import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -717,4 +718,20 @@ public void close() throws IOException {
}
}
}

public void setOperationCancellingStatusProvider(OperationCancellingStatusProvider operationCancellingStatusProvider) {
if (this.actualImplFS instanceof CosNFileSystem) {
((CosNFileSystem) this.actualImplFS).setOperationCancellingStatusProvider(operationCancellingStatusProvider);
} else {
throw new UnsupportedOperationException("Not supported currently");
}
}

public void removeOperationCancelingStatusProvider() {
if (this.actualImplFS instanceof CosNFileSystem) {
((CosNFileSystem) this.actualImplFS).removeOperationCancelingStatusProvider();
} else {
throw new UnsupportedOperationException("Not supported currently");
}
}
}
33 changes: 32 additions & 1 deletion src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.hadoop.fs.cosn.CRC32CCheckSum;
import org.apache.hadoop.fs.cosn.CRC64Checksum;
import org.apache.hadoop.fs.cosn.LocalRandomAccessMappedBufferPool;
import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider;
import org.apache.hadoop.fs.cosn.ReadBufferHolder;
import org.apache.hadoop.fs.cosn.Unit;
import org.apache.hadoop.fs.permission.FsPermission;
Expand Down Expand Up @@ -48,6 +49,8 @@
public class CosNFileSystem extends FileSystem {
private static final Logger LOG = LoggerFactory.getLogger(CosNFileSystem.class);

private ThreadLocal<OperationCancellingStatusProvider> operationCancellingStatusProviderThreadLocal = new ThreadLocal<>();

static final String SCHEME = "cosn";
static final String PATH_DELIMITER = Path.SEPARATOR;
static final Charset METADATA_ENCODING = StandardCharsets.UTF_8;
Expand Down Expand Up @@ -584,6 +587,13 @@ private void internalRecursiveDelete(String key, int listMaxLength) throws IOExc
}

priorLastKey = listing.getPriorLastKey();

if (this.operationCancellingStatusProviderThreadLocal.get() != null
&& this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
LOG.warn("The delete operation is cancelled. key: {}.", key);
break;
}

} while (priorLastKey != null && !Thread.currentThread().isInterrupted());

deleteFileContext.lock();
Expand Down Expand Up @@ -790,7 +800,13 @@ public FileStatus[] listStatus(Path f) throws IOException {
status.add(directory);
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);

if (this.operationCancellingStatusProviderThreadLocal.get() != null
&& this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
LOG.warn("The listStatus operation is cancelled. key: {}.", key);
break;
}
} while (priorLastKey != null && !Thread.currentThread().isInterrupted());

return status.toArray(new FileStatus[status.size()]);
}
Expand Down Expand Up @@ -1131,6 +1147,12 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
}

priorLastKey = objectList.getPriorLastKey();
if (this.operationCancellingStatusProviderThreadLocal.get() != null
&& this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
LOG.warn("The operation is cancelled. Stop copying the directory. srcKey: {}, dstKey: {}",
srcKey, dstKey);
break;
}
} while (null != priorLastKey && !Thread.currentThread().isInterrupted());

copyFileContext.lock();
Expand Down Expand Up @@ -1655,4 +1677,13 @@ public String pathToKey(Path path) {
public Path keyToPath(String key) {
return CosNUtils.keyToPath(key, PATH_DELIMITER);
}

public void setOperationCancellingStatusProvider(OperationCancellingStatusProvider operationCancellingStatusProvider) {
this.operationCancellingStatusProviderThreadLocal.set(operationCancellingStatusProvider);
}

// 如果设置了 OperationCancellingStatusProvider,需要记得调用这个方法做 remove 处理,防止上层线程池中内存泄漏的情况。
public void removeOperationCancelingStatusProvider() {
this.operationCancellingStatusProviderThreadLocal.remove();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.hadoop.fs.cosn;

public interface OperationCancellingStatusProvider {
boolean isCancelled();
}

0 comments on commit be03fe4

Please sign in to comment.