Skip to content

Commit

Permalink
feature: support the Operation Cancellable.
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Yu <[email protected]>
  • Loading branch information
yuyang733 committed Dec 13, 2024
1 parent b530ddd commit d6e18d8
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>8.3.12</version>
<version>8.3.14</version>
<packaging>jar</packaging>

<name>Apache Hadoop Tencent Cloud COS Support</name>
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/org/apache/hadoop/fs/CosFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.cosn.Constants;
import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -717,4 +718,20 @@ public void close() throws IOException {
}
}
}

public void setOperationCancellingStatusProvider(OperationCancellingStatusProvider operationCancellingStatusProvider) {
if (this.actualImplFS instanceof CosNFileSystem) {
((CosNFileSystem) this.actualImplFS).setOperationCancellingStatusProvider(operationCancellingStatusProvider);
} else {
throw new UnsupportedOperationException("Not supported currently");
}
}

public void removeOperationCancelingStatusProvider() {
if (this.actualImplFS instanceof CosNFileSystem) {
((CosNFileSystem) this.actualImplFS).removeOperationCancelingStatusProvider();
} else {
throw new UnsupportedOperationException("Not supported currently");
}
}
}
35 changes: 34 additions & 1 deletion src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.hadoop.fs.cosn.CRC32CCheckSum;
import org.apache.hadoop.fs.cosn.CRC64Checksum;
import org.apache.hadoop.fs.cosn.LocalRandomAccessMappedBufferPool;
import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider;
import org.apache.hadoop.fs.cosn.ReadBufferHolder;
import org.apache.hadoop.fs.cosn.Unit;
import org.apache.hadoop.fs.permission.FsPermission;
Expand Down Expand Up @@ -48,6 +49,8 @@
public class CosNFileSystem extends FileSystem {
private static final Logger LOG = LoggerFactory.getLogger(CosNFileSystem.class);

private ThreadLocal<OperationCancellingStatusProvider> operationCancellingStatusProviderThreadLocal = new ThreadLocal<>();

static final String SCHEME = "cosn";
static final String PATH_DELIMITER = Path.SEPARATOR;
static final Charset METADATA_ENCODING = StandardCharsets.UTF_8;
Expand Down Expand Up @@ -584,6 +587,13 @@ private void internalRecursiveDelete(String key, int listMaxLength) throws IOExc
}

priorLastKey = listing.getPriorLastKey();

if (this.operationCancellingStatusProviderThreadLocal.get() != null
&& this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
LOG.warn("The delete operation is cancelled. key: {}.", key);
throw new IOException("The delete operation is cancelled. key: " + key);
}

} while (priorLastKey != null && !Thread.currentThread().isInterrupted());

deleteFileContext.lock();
Expand Down Expand Up @@ -790,7 +800,13 @@ public FileStatus[] listStatus(Path f) throws IOException {
status.add(directory);
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);

if (this.operationCancellingStatusProviderThreadLocal.get() != null
&& this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
LOG.warn("The list operation is cancelled. key: {}.", key);
throw new IOException("The list operation is cancelled. key: " + key);
}
} while (priorLastKey != null && !Thread.currentThread().isInterrupted());

return status.toArray(new FileStatus[status.size()]);
}
Expand Down Expand Up @@ -963,6 +979,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {

@Override
public boolean rename(Path src, Path dst) throws IOException {

// Renaming the root directory is not allowed
if (src.isRoot()) {
LOG.debug("Cannot rename the root directory of a filesystem.");
Expand Down Expand Up @@ -1131,6 +1148,13 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
}

priorLastKey = objectList.getPriorLastKey();
if (this.operationCancellingStatusProviderThreadLocal.get() != null
&& this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
LOG.warn("The copy operation is cancelled. Stop copying the directory. srcKey: {}, dstKey: {}",
srcKey, dstKey);
throw new IOException(String.format("The copy operation is cancelled. srcKey: %s, dstKey: %s",
srcKey, dstKey));
}
} while (null != priorLastKey && !Thread.currentThread().isInterrupted());

copyFileContext.lock();
Expand Down Expand Up @@ -1655,4 +1679,13 @@ public String pathToKey(Path path) {
public Path keyToPath(String key) {
return CosNUtils.keyToPath(key, PATH_DELIMITER);
}

public void setOperationCancellingStatusProvider(OperationCancellingStatusProvider operationCancellingStatusProvider) {
this.operationCancellingStatusProviderThreadLocal.set(operationCancellingStatusProvider);
}

// 如果设置了 OperationCancellingStatusProvider,需要记得调用这个方法做 remove 处理,防止上层线程池中内存泄漏的情况。
public void removeOperationCancelingStatusProvider() {
this.operationCancellingStatusProviderThreadLocal.remove();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.hadoop.fs.cosn;

public interface OperationCancellingStatusProvider {
boolean isCancelled();
}

0 comments on commit d6e18d8

Please sign in to comment.