Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs close check throw io exception #77

Merged
merged 1 commit into from
Sep 9, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/main/java/org/apache/hadoop/fs/CosFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class CosFileSystem extends FileSystem {
private NativeFileSystemStore nativeStore;
private boolean isPosixFSStore;
private boolean isDefaultNativeStore;
private volatile boolean healthyFlag = false;
private boolean isPosixUseOFSRanger;
private boolean isPosixImpl = false;
private FileSystem actualImplFS = null;
Expand Down Expand Up @@ -143,6 +144,8 @@ public void initialize(URI uri, Configuration conf) throws IOException {


this.actualImplFS.initialize(uri, conf);
// init status
this.healthyFlag = true;
}

// load class to get relate file system
Expand Down Expand Up @@ -171,13 +174,15 @@ public Path getHomeDirectory() {
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
LOG.debug("append file [{}] in COS.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.append(f, bufferSize, progress);
}

@Override
public boolean truncate(Path f, long newLength) throws IOException {
LOG.debug("truncate file [{}] in COS.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.truncate(f, newLength);
}
Expand All @@ -189,6 +194,7 @@ public FSDataOutputStream create(Path f, FsPermission permission,
long blockSize, Progressable progress)
throws IOException {
LOG.debug("Creating a new file [{}] in COS.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.create(f, permission, overwrite, bufferSize,
replication, blockSize, progress);
Expand All @@ -198,13 +204,15 @@ public FSDataOutputStream create(Path f, FsPermission permission,
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
LOG.debug("Ready to delete path: {}. recursive: {}.", f, recursive);
healthyCheck();
checkPermission(f, RangerAccessType.DELETE);
return this.actualImplFS.delete(f, recursive);
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
LOG.debug("Get file status: {}.", f);
healthyCheck();
// keep same not change ranger permission here
return this.actualImplFS.getFileStatus(f);
}
Expand All @@ -227,6 +235,7 @@ public URI getUri() {
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
LOG.debug("list status:" + f);
healthyCheck();
checkPermission(f, RangerAccessType.LIST);
return this.actualImplFS.listStatus(f);
}
Expand All @@ -235,20 +244,23 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException
public boolean mkdirs(Path f, FsPermission permission)
throws IOException {
LOG.debug("mkdirs path: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.mkdirs(f, permission);
}

@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
LOG.debug("Open file [{}] to read, buffer [{}]", f, bufferSize);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.open(f, bufferSize);
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("Rename the source path [{}] to the dest path [{}].", src, dst);
healthyCheck();
checkPermission(src, RangerAccessType.DELETE);
checkPermission(dst, RangerAccessType.WRITE);
return this.actualImplFS.rename(src, dst);
Expand Down Expand Up @@ -276,6 +288,7 @@ public Path getWorkingDirectory() {
@Override
public FileChecksum getFileChecksum(Path f, long length) throws IOException {
LOG.debug("call the checksum for the path: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
Preconditions.checkArgument(length >= 0);
return this.actualImplFS.getFileChecksum(f, length);
Expand All @@ -294,6 +307,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException {
@Override
public void setXAttr(Path f, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
LOG.debug("set XAttr: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
this.actualImplFS.setXAttr(f, name, value, flag);
}
Expand All @@ -309,6 +323,7 @@ public void setXAttr(Path f, String name, byte[] value, EnumSet<XAttrSetFlag> fl
@Override
public byte[] getXAttr(Path f, String name) throws IOException {
LOG.debug("get XAttr: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttr(f, name);
}
Expand All @@ -324,13 +339,15 @@ public byte[] getXAttr(Path f, String name) throws IOException {
@Override
public Map<String, byte[]> getXAttrs(Path f, List<String> names) throws IOException {
LOG.debug("get XAttrs: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttrs(f, names);
}

@Override
public Map<String, byte[]> getXAttrs(Path f) throws IOException {
LOG.debug("get XAttrs: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttrs(f);
}
Expand All @@ -345,13 +362,15 @@ public Map<String, byte[]> getXAttrs(Path f) throws IOException {
@Override
public void removeXAttr(Path f, String name) throws IOException {
LOG.debug("remove XAttr: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
this.actualImplFS.removeXAttr(f, name);
}

@Override
public List<String> listXAttrs(Path f) throws IOException {
LOG.debug("list XAttrs: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.listXAttrs(f);
}
Expand Down Expand Up @@ -429,6 +448,7 @@ private void transferOfsConfig() {
// CHDFS Support Only
public void releaseFileLock(Path f) throws IOException {
LOG.debug("Release the file lock: {}.", f);
healthyCheck();
if (this.actualImplFS instanceof CHDFSHadoopFileSystemAdapter) {
((CHDFSHadoopFileSystemAdapter) this.actualImplFS).releaseFileLock(f);
} else {
Expand Down Expand Up @@ -464,6 +484,7 @@ private boolean useOFSRanger() {
*/
private void checkCustomAuth(Configuration conf) throws IOException {
// todo: need get token first
healthyCheck();
this.rangerCredentialsClient.doCheckCustomAuth(conf);
}

Expand All @@ -488,6 +509,12 @@ private String getOwnerId() {
return shortUserName;
}

private void healthyCheck() throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

方法名字改为 isInitialized()

if (!this.healthyFlag) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个变量改为 initialized

throw new IOException("fileSystem has been closed or not init");
}
}

@Override
public void close() throws IOException {
LOG.info("begin to close cos file system");
Expand All @@ -496,5 +523,6 @@ public void close() throws IOException {
// close range client later, inner native store
this.nativeStore.close();
}
this.healthyFlag = false;
}
}