From b2c6112cdfee79f75dc7d024bbeb98eba55ecb61 Mon Sep 17 00:00:00 2001 From: Wine93 Date: Thu, 7 Mar 2024 10:31:43 +0800 Subject: [PATCH] [WIP] improve metadata perfomance for HDFS. Signed-off-by: Wine93 --- .../curve/fs/hadoop/CurveFSProto.java | 57 -- .../curve/fs/hadoop/CurveFSTalker.java | 185 ------ .../curve/fs/hadoop/CurveFileSystem.java | 584 ++++++++++-------- ...putStream.java => CurveFsInputStream.java} | 23 +- ...utStream.java => CurveFsOutputStream.java} | 28 +- .../curve/fs/libfs/CurveFSMount.java | 228 ------- .../opencurve/curve/fs/libfs/CurveFSStat.java | 50 -- ...veFSStatVFS.java => CurveFsException.java} | 24 +- .../curve/fs/libfs/CurveFsMount.java | 276 +++++++++ ...veLoader.java => CurveFsNativeLoader.java} | 29 +- .../curve/fs/libfs/CurveFsProto.java | 61 ++ 11 files changed, 718 insertions(+), 827 deletions(-) delete mode 100644 curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSProto.java delete mode 100644 curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java rename curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/{CurveFSInputStream.java => CurveFsInputStream.java} (92%) rename curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/{CurveFSOutputStream.java => CurveFsOutputStream.java} (87%) delete mode 100644 curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSMount.java delete mode 100644 curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSStat.java rename curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/{CurveFSStatVFS.java => CurveFsException.java} (67%) create mode 100644 curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsMount.java rename curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/{CurveFSNativeLoader.java => CurveFsNativeLoader.java} (86%) create mode 100644 curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsProto.java diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSProto.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSProto.java deleted file mode 100644 index df8ab69b21..0000000000 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSProto.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2023 NetEase Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: Curve - * Created Date: 2023-08-01 - * Author: NetEase Media Bigdata - */ - -package io.opencurve.curve.fs.hadoop; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import io.opencurve.curve.fs.libfs.CurveFSStat; -import io.opencurve.curve.fs.libfs.CurveFSStatVFS; - -import java.io.IOException; -import java.net.URI; - -abstract class CurveFSProto { - // init* - abstract void initialize(URI uri, Configuration conf) throws IOException; - abstract void shutdown() throws IOException; - // directory* - abstract void mkdirs(Path path, int mode) throws IOException; - abstract void rmdir(Path path) throws IOException; - abstract String[] listdir(Path path) throws IOException; - // file* - abstract int open(Path path, int flags, int mode) throws IOException; - abstract long lseek(int fd, long offset, int whence) throws IOException; - abstract int write(int fd, byte[] buf, long size, long offset) throws IOException; - abstract int read(int fd, byte[] buf, long size, long offset) throws IOException; - abstract void fsync(int fd) throws IOException; - abstract void close(int fd) throws IOException; - abstract void unlink(Path path) throws IOException; - // others - abstract void statfs(Path path, CurveFSStatVFS stat) throws IOException; - abstract void lstat(Path path, CurveFSStat stat) throws IOException; - abstract void fstat(int fd, CurveFSStat stat) throws IOException; - abstract void setattr(Path path, CurveFSStat stat, int mask) throws IOException; - abstract void chmod(Path path, int mode) throws IOException; - abstract void chown(Path path, int uid, int gid) throws IOException; - abstract void rename(Path src, Path dst) throws IOException; -} diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java deleted file mode 100644 index 26d0492142..0000000000 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright (c) 2023 NetEase Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: Curve - * Created Date: 2023-08-01 - * Author: NetEase Media Bigdata - */ - -package io.opencurve.curve.fs.hadoop; - -import org.apache.commons.logging.Log; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import io.opencurve.curve.fs.libfs.CurveFSMount; -import io.opencurve.curve.fs.libfs.CurveFSStat; -import io.opencurve.curve.fs.libfs.CurveFSStatVFS; - -import java.util.UUID; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.Map; - -class CurveFSTalker extends CurveFSProto { - private CurveFSMount mount; - private String fsname = null; - private String mountpoint = null; - private boolean inited = false; - - private static final String PREFIX_KEY = "curvefs"; - - CurveFSTalker(Configuration conf, Log log) { - mount = null; - } - - private String tostr(Path path) { - if (null == path) { - return "/"; - } - return path.toUri().getPath(); - } - - private void loadCfg(Configuration conf) { - Map m = conf.getValByRegex("^" + PREFIX_KEY + "\\..*"); - for (Map.Entry entry : m.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (key.equals(PREFIX_KEY + ".name")) { - fsname = value; - } else { - mount.confSet(key.substring(PREFIX_KEY.length() + 1), value); - } - } - } - - @Override - void initialize(URI uri, Configuration conf) throws IOException { - mount = new CurveFSMount(); - loadCfg(conf); - if (null == fsname || fsname.isEmpty()) { - throw new IOException("curvefs.name is not set"); - } - mountpoint = UUID.randomUUID().toString(); - mount.mount(fsname, mountpoint); - inited = true; - } - - @Override - void shutdown() throws IOException { - if (inited) { - mount.umount(fsname, mountpoint); - mount = null; - inited = false; - } - } - - @Override - void mkdirs(Path path, int mode) throws IOException { - mount.mkdirs(tostr(path), mode); - } - - @Override - void rmdir(Path path) throws IOException { - mount.rmdir(tostr(path)); - } - - @Override - String[] listdir(Path path) throws IOException { - CurveFSStat stat = new CurveFSStat(); - try { - mount.lstat(tostr(path), stat); - } catch (FileNotFoundException e) { - return null; - } - if (!stat.isDir()) { - return null; - } - - return mount.listdir(tostr(path)); - } - - @Override - int open(Path path, int flags, int mode) throws IOException { - return mount.open(tostr(path), flags, mode); - } - - @Override - long lseek(int fd, long offset, int whence) throws IOException { - return mount.lseek(fd, offset, whence); - } - - @Override - int write(int fd, byte[] buf, long size, long offset) throws IOException { - return mount.write(fd, buf, size, offset); - } - - @Override - int read(int fd, byte[] buf, long size, long offset) throws IOException { - return mount.read(fd, buf, size, offset); - } - - @Override - void fsync(int fd) throws IOException { - mount.fsync(fd); - } - - @Override - void close(int fd) throws IOException { - mount.close(fd); - } - - @Override - void unlink(Path path) throws IOException { - mount.unlink(tostr(path)); - } - - @Override - void statfs(Path path, CurveFSStatVFS stat) throws IOException { - mount.statfs(tostr(path), stat); - } - - @Override - void lstat(Path path, CurveFSStat stat) throws IOException { - mount.lstat(tostr(path), stat); - } - - @Override - void fstat(int fd, CurveFSStat stat) throws IOException { - mount.fstat(fd, stat); - } - - @Override - void setattr(Path path, CurveFSStat stat, int mask) throws IOException { - mount.setattr(tostr(path), stat, mask); - } - - @Override - void chmod(Path path, int mode) throws IOException { - mount.chmod(tostr(path), mode); - } - - @Override - void chown(Path path, int uid, int gid) throws IOException { - mount.chown(tostr(path), uid, gid); - } - - @Override - void rename(Path src, Path dst) throws IOException { - mount.rename(tostr(src), tostr(dst)); - } -} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java index fc031d38d8..8f1239afcf 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java @@ -22,368 +22,446 @@ package io.opencurve.curve.fs.hadoop; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.net.URI; +import java.io.IOException; +import java.io.OutputStream; +import java.io.FileNotFoundException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; -import io.opencurve.curve.fs.libfs.CurveFSMount; -import io.opencurve.curve.fs.libfs.CurveFSStat; -import io.opencurve.curve.fs.libfs.CurveFSStatVFS; -import io.opencurve.curve.fs.hadoop.permission.Permission; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; +import io.opencurve.curve.fs.hadoop.permission.Permission; +import io.opencurve.curve.fs.libfs.CurveFsProto; +import io.opencurve.curve.fs.libfs.CurveFsMount; +import io.opencurve.curve.fs.libfs.CurveFsMount.StatVfs; +import io.opencurve.curve.fs.libfs.CurveFsMount.Stat; +import io.opencurve.curve.fs.libfs.CurveFsMount.File; +import io.opencurve.curve.fs.libfs.CurveFsMount.Dirent; +import io.opencurve.curve.fs.libfs.CurveFsException.NotADirectoryException; public class CurveFileSystem extends FileSystem { - private static final Log LOG = LogFactory.getLog(CurveFileSystem.class); + private static final int O_RDONLY = CurveFsMount.O_RDONLY; + private static final int O_WRONLY = CurveFsMount.O_WRONLY; + private static final int O_APPEND = CurveFsMount.O_APPEND; + private static final int SETATTR_MTIME = CurveFsMount.SETATTR_MTIME; + private static final int SETATTR_ATIME = CurveFsMount.SETATTR_ATIME; private URI uri; private Path workingDir; - private CurveFSProto curve = null; - private Permission perm = null; + private CurveFsProto curvefs = null; + private Permission permission = null; public CurveFileSystem() {} - public CurveFileSystem(Configuration conf) { - setConf(conf); - } + public CurveFileSystem(Configuration conf) { setConf(conf); } - private Path makeAbsolute(Path path) { - if (path.isAbsolute()) { - return path; - } - return new Path(workingDir, path); + // e.g. /my/dir1 + private String makeAbsolute(Path path) { + return makeQualified(path).toUri().getPath(); } + /** + * Get the current working directory for the given FileSystem + * @return the directory pathname + */ @Override - public URI getUri() { - return uri; + public Path getWorkingDirectory() { + return workingDir; } + /** + * Set the current working directory for the given FileSystem. All relative + * paths will be resolved relative to it. + * + * @param new_dir Path of new working directory + */ @Override - public String getScheme() { - return "hdfs"; + public void setWorkingDirectory(Path new_dir) { + workingDir = fixRelativePart(new_dir); + checkPath(workingDir); } + /** Called after a new FileSystem instance is constructed. + * @param name a uri whose authority section names the host, port, etc. + * for this FileSystem + * @param conf the configuration + */ @Override public void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); - if (curve == null) { - curve = new CurveFSTalker(conf, LOG); + if (curvefs == null) { + curvefs = new CurveFsMount(); } - if (perm == null) { - perm = new Permission(); + if (permission == null) { + permission = new Permission(); } - perm.initialize(conf); - curve.initialize(uri, conf); + curvefs.initialize(uri, conf); + permission.initialize(conf); setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = getHomeDirectory(); } - @Override - public FSDataInputStream open(Path path, int bufferSize) throws IOException { - path = makeAbsolute(path); - - int fd = curve.open(path, CurveFSMount.O_RDONLY, 0); - - CurveFSStat stat = new CurveFSStat(); - curve.fstat(fd, stat); - - CurveFSInputStream istream = new CurveFSInputStream(getConf(), curve, fd, stat.size, bufferSize); - return new FSDataInputStream(istream); - } - + /** + * No more filesystem operations are needed. Will + * release any held locks. + */ @Override public void close() throws IOException { super.close(); - curve.shutdown(); + curvefs.shutdown(); } + /** + * Return the protocol scheme for the FileSystem. + *

+ * This implementation throws an UnsupportedOperationException. + * + * @return the protocol scheme for the FileSystem. + */ @Override - public FSDataOutputStream append(Path path, int bufferSize, Progressable progress) throws IOException { - path = makeAbsolute(path); - - if (progress != null) { - progress.progress(); - } - - int fd = curve.open(path, CurveFSMount.O_WRONLY | CurveFSMount.O_APPEND, 0); - if (progress != null) { - progress.progress(); - } - - CurveFSOutputStream ostream = new CurveFSOutputStream(getConf(), curve, fd, bufferSize); - return new FSDataOutputStream(ostream, statistics); + public String getScheme() { + return "hdfs"; } + /** Returns a URI whose scheme and authority identify this FileSystem.*/ @Override - public Path getWorkingDirectory() { - return workingDir; + public URI getUri() { + return uri; } + /** + * Call {@link #mkdirs(Path, FsPermission)} with default permission. + */ @Override - public void setWorkingDirectory(Path dir) { - workingDir = makeAbsolute(dir); + public boolean mkdirs(Path f) throws IOException { + FsPermission perms = FsPermission.getDirDefault().applyUMask(FsPermission.getUMask(getConf())); + return mkdirs(f, perms); } + /** + * Make the given file and all non-existent parents into + * directories. Has the semantics of Unix 'mkdir -p'. + * Existence of the directory hierarchy is not an error. + * @param f path to create + * @param permission to apply to f + */ @Override - public boolean mkdirs(Path path, FsPermission perms) throws IOException { - path = makeAbsolute(path); - curve.mkdirs(path, (int) perms.toShort()); + public boolean mkdirs(Path f, FsPermission perms) throws IOException { + try { + curvefs.mkdirs(makeAbsolute(f), perms.toShort()); + } catch (IOException e) { + return false; + } return true; } - @Override - public boolean mkdirs(Path f) throws IOException { - FsPermission perms = FsPermission.getDirDefault().applyUMask(FsPermission.getUMask(getConf()));; - return mkdirs(f, perms); + private FileStatus newFileStatus(Path f, Stat stat) { + return new FileStatus(stat.size, + stat.isDirectory, + 1, + stat.blksize, + stat.mtime, + stat.atime, + new FsPermission((short) stat.mode), + permission.getUsername(stat.uid), + permission.getGroupname(stat.gid), + makeQualified(f)); // e.g. curvefs://my/dir1 } + /** + * Return a file status object that represents the path. + * @param f The path we want information from + * @return a FileStatus object + * @throws FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ @Override - public FileStatus getFileStatus(Path path) throws IOException { - path = makeAbsolute(path); - - CurveFSStat stat = new CurveFSStat(); - curve.lstat(path, stat); - String owner = perm.getUsername(stat.uid);; - String group = perm.getGroupname(stat.gid);; - - FileStatus status = new FileStatus( - stat.size, stat.isDir(), 1, stat.blksize, - stat.m_time, stat.a_time, - new FsPermission((short) stat.mode), owner, group, - path.makeQualified(this)); - return status; + public FileStatus getFileStatus(Path f) throws IOException { + Stat stat = new Stat(); + curvefs.lstat(makeAbsolute(f), stat); + return newFileStatus(f, stat); } + /** + * List the statuses of the files/directories in the given path if the path is + * a directory. + * + * @param f given path + * @return the statuses of the files/directories in the given patch + * @throws FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ @Override - public FileStatus[] listStatus(Path path) throws IOException { - path = makeAbsolute(path); - - if (isFile(path)) { - return new FileStatus[]{getFileStatus(path)}; + public FileStatus[] listStatus(Path f) throws IOException { + Dirent[] dirents; + try { + dirents = curvefs.listdir(makeAbsolute(f)); + } catch(NotADirectoryException e) { + return new FileStatus[]{ getFileStatus(f) }; } - String[] dirlist = curve.listdir(path); - if (dirlist != null) { - FileStatus[] status = new FileStatus[dirlist.length]; - for (int i = 0; i < status.length; i++) { - status[i] = getFileStatus(new Path(path, dirlist[i])); - } - return status; - } else { - throw new FileNotFoundException("File " + path + " does not exist."); + FileStatus[] statuses = new FileStatus[dirents.length]; + for (int i = 0; i < dirents.length; i++) { + Path p = makeQualified(new Path(f, new String(dirents[i].name))); + statuses[i] = newFileStatus(p, dirents[i].stat); } + return statuses; } - @Override - public void setPermission(Path path, FsPermission permission) throws IOException { - path = makeAbsolute(path); - curve.chmod(path, permission.toShort()); + private FSDataInputStream createFsDataInputStream(File file, + int bufferSize) throws IOException { + CurveFsInputStream istream = + new CurveFsInputStream(getConf(), curvefs, file.fd, file.length, bufferSize); + return new FSDataInputStream(istream); } - @Override - public void setTimes(Path path, long mtime, long atime) throws IOException { - path = makeAbsolute(path); - - CurveFSStat stat = new CurveFSStat(); - - int mask = 0; - if (mtime != -1) { - stat.m_time = mtime; - mask |= CurveFSMount.SETATTR_MTIME; - } - - if (atime != -1) { - stat.a_time = atime; - mask |= CurveFSMount.SETATTR_ATIME; - } - - curve.setattr(path, stat, mask); + private FSDataOutputStream createFsDataOutputStream(File file, + int bufferSize) throws IOException { + OutputStream ostream = + new CurveFsOutputStream(getConf(), curvefs, file.fd, bufferSize); + return new FSDataOutputStream(ostream, statistics); } + /** + * Opens an FSDataInputStream at the indicated Path. + * @param f the file name to open + * @param bufferSize the size of the buffer to be used. + */ @Override - public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize, - short replication, long blockSize, Progressable progress) throws IOException { - path = makeAbsolute(path); - - boolean exists = exists(path); - - if (progress != null) { - progress.progress(); - } - - int flags = CurveFSMount.O_WRONLY | CurveFSMount.O_CREAT; - - if (exists) { - if (overwrite) { - flags |= CurveFSMount.O_TRUNC; - } else { - throw new FileAlreadyExistsException(); - } - } else { - Path parent = path.getParent(); - if (parent != null) { - if (!mkdirs(parent)) { - throw new IOException("mkdirs failed for " + parent.toString()); - } - } - } - - if (progress != null) { - progress.progress(); - } - - int fd = curve.open(path, flags, (int) permission.toShort()); - - if (progress != null) { - progress.progress(); - } - - OutputStream ostream = new CurveFSOutputStream(getConf(), curve, fd, bufferSize); - return new FSDataOutputStream(ostream, statistics); + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + File file = new File(); + curvefs.open(makeAbsolute(f), O_RDONLY, file); + return createFsDataInputStream(file, bufferSize); } + /** + * Create an FSDataOutputStream at the indicated Path with write-progress + * reporting. + * @param f the file name to open + * @param permission + * @param overwrite if a file with this name already exists, then if true, + * the file will be overwritten, and if false an error will be thrown. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @throws IOException + * @see #setPermission(Path, FsPermission) + */ @Override - public void setOwner(Path path, String username, String groupname) throws IOException { - CurveFSStat stat = new CurveFSStat(); - curve.lstat(path, stat); - - int uid = stat.uid; - int gid = stat.gid; - if (username != null) { - uid = perm.getUid(username); - } - if (groupname != null) { - gid = perm.getGid(groupname); + public FSDataOutputStream create(Path f, + FsPermission permission, + boolean overwrite, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + File file = new File(); + for ( ; ; ) { + try { + curvefs.create(makeAbsolute(f), permission.toShort(), file); + } catch(FileNotFoundException e) { // parent directorty not exist + Path parent = makeQualified(f).getParent(); + try { + mkdirs(parent, FsPermission.getDirDefault()); + } catch (FileAlreadyExistsException ignored) {} + continue; // create file again + } catch(FileAlreadyExistsException e) { + if (!overwrite || isDirectory(f)) { + throw new FileAlreadyExistsException("File already exists: " + f); + } + delete(f, false); + continue; // create file again + } + break; } - - curve.chown(path, uid, gid); + return createFsDataOutputStream(file, bufferSize); } + /** + * Opens an FSDataOutputStream at the indicated Path with write-progress + * reporting. Same as create(), except fails if parent directory doesn't + * already exist. + * @param f the file name to open + * @param permission + * @param overwrite if a file with this name already exists, then if true, + * the file will be overwritten, and if false an error will be thrown. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @throws IOException + * @see #setPermission(Path, FsPermission) + * @deprecated API only for 0.20-append + */ @Deprecated @Override - public FSDataOutputStream createNonRecursive(Path path, FsPermission permission, + public FSDataOutputStream createNonRecursive(Path f, + FsPermission permission, boolean overwrite, - int bufferSize, short replication, long blockSize, + int bufferSize, + short replication, + long blockSize, Progressable progress) throws IOException { - path = makeAbsolute(path); - - Path parent = path.getParent(); - - if (parent != null) { - CurveFSStat stat = new CurveFSStat(); - curve.lstat(parent, stat); - if (stat.isFile()) { - throw new FileAlreadyExistsException(parent.toString()); + File file = new File(); + for ( ; ; ) { + try { + curvefs.create(makeAbsolute(f), permission.toShort(), file); + } catch(FileAlreadyExistsException e) { + if (!overwrite || isDirectory(f)) { + throw new FileAlreadyExistsException("File already exists: " + f); + } + delete(f, false); + continue; // create file again } + break; } + return createFsDataOutputStream(file, bufferSize); + } - return this.create(path, permission, overwrite, - bufferSize, replication, blockSize, progress); + /** + * Append to an existing file (optional operation). + * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null) + * @param f the existing file to be appended. + * @throws IOException + */ + @Override + public FSDataOutputStream append(Path f, + int bufferSize, + Progressable progress) throws IOException { + File file = new File(); + curvefs.open(makeAbsolute(f), O_WRONLY | O_APPEND, file); + return createFsDataOutputStream(file, bufferSize); } + /** + * Renames Path src to Path dst. Can take place on local fs + * or remote DFS. + * @param src path to be renamed + * @param dst new path after rename + * @throws IOException on failure + * @return true if rename is successful + */ @Override public boolean rename(Path src, Path dst) throws IOException { - src = makeAbsolute(src); - dst = makeAbsolute(dst); - try { - CurveFSStat stat = new CurveFSStat(); - curve.lstat(dst, stat); - if (stat.isDir()) { - return rename(src, new Path(dst, src.getName())); - } + curvefs.rename(makeAbsolute(src), makeAbsolute(dst)); + } catch(FileNotFoundException e) { // src path not exist return false; - } catch (FileNotFoundException e) { + } catch (FileAlreadyExistsException e) { + FileStatus status = getFileStatus(dst); + if (!status.isDirectory()) { + return false; + } + // FIXME: } - try { - curve.rename(src, dst); - } catch (FileNotFoundException e) { - throw e; - } catch (Exception e) { - return false; - } return true; } - @Deprecated + /** Delete a file. + * + * @param f the path to delete. + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. In + * case of a file the recursive can be set to either true or false. + * @return true if delete is successful else false. + * @throws IOException + */ @Override - public boolean delete(Path path) throws IOException { - return delete(path, false); - } - - @Override - public boolean delete(Path path, boolean recursive) throws IOException { - path = makeAbsolute(path); - - FileStatus status; + public boolean delete(Path f, boolean recursive) throws IOException { try { - status = getFileStatus(path); - } catch (FileNotFoundException e) { - return false; - } - - if (status.isFile()) { - curve.unlink(path); - return true; - } - - FileStatus[] dirlist = listStatus(path); - if (dirlist == null) { - return false; - } - - if (!recursive && dirlist.length > 0) { - throw new IOException("Directory " + path.toString() + "is not empty."); - } - - for (FileStatus fs : dirlist) { - if (!delete(fs.getPath(), recursive)) { - return false; + if (recursive) { + curvefs.removeall(makeAbsolute(f)); + } else { + curvefs.remove(makeAbsolute(f)); } + } catch (IOException e) { + return false; } - - curve.rmdir(path); return true; } + /** + * Returns a status object describing the use and capacity of the + * file system. If the file system has multiple partitions, the + * use and capacity of the partition pointed to by the specified + * path is reflected. + * @param p Path for which status should be obtained. null means + * the default partition. + * @return a FsStatus object + * @throws IOException + * see specific implementation + */ @Override public FsStatus getStatus(Path p) throws IOException { - CurveFSStatVFS stat = new CurveFSStatVFS(); - curve.statfs(p, stat); - - FsStatus status = new FsStatus(stat.bsize * stat.blocks, - stat.bsize * (stat.blocks - stat.bavail), - stat.bsize * stat.bavail); - return status; + StatVfs statvfs = new StatVfs(); + curvefs.statfs(makeAbsolute(p), statvfs); + return new FsStatus(statvfs.bsize * statvfs.blocks, // capacity + statvfs.bsize * (statvfs.blocks - statvfs.bavail), // used + statvfs.bsize * statvfs.bavail); // remaining } + /** + * Set permission of a path. + * @param p + * @param permission + */ @Override - public short getDefaultReplication() { - return 1; + public void setPermission(Path p, FsPermission permission) throws IOException { + curvefs.chmod(makeAbsolute(p), permission.toShort()); } + /** + * Set owner of a path (i.e. a file or a directory). + * The parameters username and groupname cannot both be null. + * @param p The path + * @param username If it is null, the original username remains unchanged. + * @param groupname If it is null, the original groupname remains unchanged. + */ @Override - public long getDefaultBlockSize() { - return super.getDefaultBlockSize(); - } + public void setOwner(Path p, String username, String groupname) throws IOException { + Stat stat = new Stat(); + curvefs.lstat(makeAbsolute(p), stat); // TODO(Wine93): remove this operation - @Override - protected int getDefaultPort() { - return super.getDefaultPort(); + int uid = stat.uid; + int gid = stat.gid; + if (username != null) { + uid = permission.getUid(username); + } + if (groupname != null) { + gid = permission.getGid(groupname); + } + curvefs.chown(makeAbsolute(p), uid, gid); } + /** + * Set access time of a file + * @param p The path + * @param mtime Set the modification time of this file. + * The number of milliseconds since Jan 1, 1970. + * A value of -1 means that this call should not set modification time. + * @param atime Set the access time of this file. + * The number of milliseconds since Jan 1, 1970. + * A value of -1 means that this call should not set access time. + */ @Override - public String getCanonicalServiceName() { - return null; + public void setTimes(Path p, long mtime, long atime) throws IOException { + Stat stat = new Stat(); + int mask = 0; + if (mtime != -1) { + stat.mtime = mtime; + mask |= SETATTR_MTIME; + } + if (atime != -1) { + stat.atime = atime; + mask |= SETATTR_ATIME; + } + curvefs.setattr(makeAbsolute(p), stat, mask); } -} +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFsInputStream.java similarity index 92% rename from curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java rename to curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFsInputStream.java index 25ad56564f..eb40992bf7 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFsInputStream.java @@ -26,7 +26,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSInputStream; -import io.opencurve.curve.fs.libfs.CurveFSMount; +import io.opencurve.curve.fs.libfs.CurveFsMount; +import io.opencurve.curve.fs.libfs.CurveFsProto; import java.io.IOException; @@ -35,15 +36,15 @@ * An {@link FSInputStream} for a CurveFileSystem and corresponding * Curve instance. */ -public class CurveFSInputStream extends FSInputStream { - private static final Log LOG = LogFactory.getLog(CurveFSInputStream.class); +public class CurveFsInputStream extends FSInputStream { + private static final Log LOG = LogFactory.getLog(CurveFsInputStream.class); private boolean closed; private int fileHandle; private long fileLength; - private CurveFSProto curve; + private CurveFsProto curvefs; private byte[] buffer; private int bufPos = 0; @@ -57,12 +58,12 @@ public class CurveFSInputStream extends FSInputStream { * @param flength The current length of the file. If the length changes * you will need to close and re-open it to access the new data. */ - public CurveFSInputStream(Configuration conf, CurveFSProto curvefs, + public CurveFsInputStream(Configuration conf, CurveFsProto curve, int fh, long flength, int bufferSize) { fileLength = flength; fileHandle = fh; closed = false; - curve = curvefs; + curvefs = curve; buffer = new byte[1<<21]; LOG.debug("CurveInputStream constructor: initializing stream with fh " + fh + " and file length " + flength); @@ -83,14 +84,14 @@ protected void finalize() throws Throwable { } private synchronized boolean fillBuffer() throws IOException { - bufValid = curve.read(fileHandle, buffer, buffer.length, -1); + bufValid = curvefs.read(fileHandle, -1, buffer, buffer.length); bufPos = 0; if (bufValid < 0) { int err = bufValid; bufValid = 0; - curve.lseek(fileHandle, curvePos, CurveFSMount.SEEK_SET); + curvefs.lseek(fileHandle, curvePos, CurveFsMount.SEEK_SET); throw new IOException("Failed to fill read buffer! Error code:" + err); } curvePos += bufValid; @@ -127,7 +128,7 @@ public synchronized void seek(long targetPos) throws IOException { } long oldPos = curvePos; - curvePos = curve.lseek(fileHandle, targetPos, CurveFSMount.SEEK_SET); + curvePos = curvefs.lseek(fileHandle, targetPos, CurveFsMount.SEEK_SET); bufValid = 0; bufPos = 0; if (curvePos < 0) { @@ -244,9 +245,9 @@ public synchronized int read(byte buf[], int off, int len) throws IOException { public void close() throws IOException { LOG.trace("CurveOutputStream.close:enter"); if (!closed) { - curve.close(fileHandle); + curvefs.close(fileHandle); closed = true; LOG.trace("CurveOutputStream.close:exit"); } } -} +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSOutputStream.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFsOutputStream.java similarity index 87% rename from curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSOutputStream.java rename to curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFsOutputStream.java index 06855bcd7d..8f24b3c7ca 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSOutputStream.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFsOutputStream.java @@ -25,7 +25,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import io.opencurve.curve.fs.libfs.CurveFSMount; +import io.opencurve.curve.fs.libfs.CurveFsMount; +import io.opencurve.curve.fs.libfs.CurveFsProto; import java.io.IOException; import java.io.OutputStream; @@ -41,13 +42,10 @@ * libcurvefs. Currently it might be useful to reduce JNI crossings, but not * much more. */ -public class CurveFSOutputStream extends OutputStream { +public class CurveFsOutputStream extends OutputStream { private boolean closed; - - private CurveFSProto curve; - + private CurveFsProto curvefs; private int fileHandle; - private byte[] buffer; private int bufUsed = 0; @@ -56,9 +54,11 @@ public class CurveFSOutputStream extends OutputStream { * @param conf The FileSystem configuration. * @param fh The Curve filehandle to connect to. */ - public CurveFSOutputStream(Configuration conf, CurveFSProto curvefs, - int fh, int bufferSize) { - curve = curvefs; + public CurveFsOutputStream(Configuration conf, + CurveFsProto curve, + int fh, + int bufferSize) { + curvefs = curve; fileHandle = fh; closed = false; buffer = new byte[1<<21]; @@ -92,7 +92,7 @@ private synchronized void checkOpen() throws IOException { */ public synchronized long getPos() throws IOException { checkOpen(); - return curve.lseek(fileHandle, 0, CurveFSMount.SEEK_CUR); + return curvefs.lseek(fileHandle, 0, CurveFsMount.SEEK_CUR); } @Override @@ -129,7 +129,7 @@ private synchronized void flushBuffer() throws IOException { } while (bufUsed > 0) { - int ret = curve.write(fileHandle, buffer, bufUsed, -1); + int ret = curvefs.write(fileHandle, -1, buffer, bufUsed); if (ret < 0) { throw new IOException("curve.write: ret=" + ret); } @@ -161,14 +161,14 @@ private synchronized void flushBuffer() throws IOException { public synchronized void flush() throws IOException { checkOpen(); flushBuffer(); // buffer -> libcurvefs - curve.fsync(fileHandle); // libcurvefs -> cluster + curvefs.fsync(fileHandle); // libcurvefs -> cluster } @Override public synchronized void close() throws IOException { checkOpen(); flush(); - curve.close(fileHandle); + curvefs.close(fileHandle); closed = true; } -} +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSMount.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSMount.java deleted file mode 100644 index 7b423dcc0f..0000000000 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSMount.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Copyright (c) 2023 NetEase Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: Curve - * Created Date: 2023-07-07 - * Author: Jingli Chen (Wine93) - */ -package io.opencurve.curve.fs.libfs; - -import java.io.IOException; - -public class CurveFSMount { - // init - private native long nativeCurveFSCreate(); - private native void nativeCurveFSRelease(long instancePtr); - private static native void nativeCurveFSConfSet(long instancePtr, String key, String value); - private static native int nativeCurveFSMount(long instancePtr, String fsname, String mountpoint); - private static native int nativeCurveFSUmount(long instancePtr, String fsname, String mountpoint); - // dir* - private static native int nativeCurveFSMkDirs(long instancePtr, String path, int mode); - private static native int nativeCurveFSRmDir(long instancePtr, String path); - private static native String[] nativeCurveFSListDir(long instancePtr, String path); - // file* - private static native int nativeCurveFSOpen(long instancePtr, String path, int flags, int mode); - private static native long nativeCurveFSLSeek(long instancePtr, int fd, long offset, int whence); - private static native long nativieCurveFSRead(long instancePtr, int fd, byte[] buffer, long size, long offset); - private static native long nativieCurveFSWrite(long instancePtr, int fd, byte[] buffer, long size, long offset); - private static native int nativeCurveFSFSync(long instancePtr, int fd); - private static native int nativeCurveFSClose(long instancePtr, int fd); - private static native int nativeCurveFSUnlink(long instancePtr, String path); - // others - private static native int nativeCurveFSStatFs(long instancePtr, CurveFSStatVFS statvfs); - private static native int nativeCurveFSLstat(long instancePtr, String path, CurveFSStat stat); - private static native int nativeCurveFSFStat(long instancePtr, int fd, CurveFSStat stat); - private static native int nativeCurveFSSetAttr(long instancePtr, String path, CurveFSStat stat, int mask); - private static native int nativeCurveFSChmod(long instancePtr, String path, int mode); - private static native int nativeCurveFSChown(long instancePtr, String path, int uid, int gid); - private static native int nativeCurveFSRename(long instancePtr, String src, String dst); - - /* - * Flags for open(). - * - * Must be synchronized with JNI if changed. - */ - public static final int O_RDONLY = 1; - public static final int O_RDWR = 2; - public static final int O_APPEND = 4; - public static final int O_CREAT = 8; - public static final int O_TRUNC = 16; - public static final int O_EXCL = 32; - public static final int O_WRONLY = 64; - public static final int O_DIRECTORY = 128; - - /* - * Whence flags for seek(). - * - * Must be synchronized with JNI if changed. - */ - public static final int SEEK_SET = 0; - public static final int SEEK_CUR = 1; - public static final int SEEK_END = 2; - - /* - * Attribute flags for setattr(). - * - * Must be synchronized with JNI if changed. - */ - public static final int SETATTR_MODE = 1; - public static final int SETATTR_UID = 2; - public static final int SETATTR_GID = 4; - public static final int SETATTR_MTIME = 8; - public static final int SETATTR_ATIME = 16; - - private static final String CURVEFS_DEBUG_ENV_VAR = "CURVEFS_DEBUG"; - private static final String CLASS_NAME = "io.opencurve.curve.fs.CurveFSMount"; - - private long instancePtr; - - private static void accessLog(String name, String... args) { - String value = System.getenv(CURVEFS_DEBUG_ENV_VAR); - if (!Boolean.valueOf(value)) { - return; - } - - String params = String.join(",", args); - String message = String.format("%s.%s(%s)", CLASS_NAME, name, params); - System.out.println(message); - } - - static { - accessLog("loadLibrary"); - try { - CurveFSNativeLoader.getInstance().loadLibrary(); - } catch(Exception e) {} - } - - protected void finalize() throws Throwable { - accessLog("finalize"); - } - - public CurveFSMount() { - accessLog("CurveMount"); - instancePtr = nativeCurveFSCreate(); - } - - public void confSet(String key, String value) { - accessLog("confSet", key, value); - nativeCurveFSConfSet(instancePtr, key, value); - } - - public void mount(String fsname, String mountpoint) throws IOException { - accessLog("mount"); - nativeCurveFSMount(instancePtr, fsname, mountpoint); - } - - public void umount(String fsname, String mountpoint) throws IOException { - accessLog("umount"); - nativeCurveFSUmount(instancePtr, fsname, mountpoint); - } - - public void shutdown() throws IOException { - accessLog("shutdown"); - } - - // directory* - public void mkdirs(String path, int mode) throws IOException { - accessLog("mkdirs", path.toString()); - nativeCurveFSMkDirs(instancePtr, path, mode); - } - - public void rmdir(String path) throws IOException { - accessLog("rmdir", path.toString()); - nativeCurveFSRmDir(instancePtr, path); - } - - public String[] listdir(String path) throws IOException { - accessLog("listdir", path.toString()); - return nativeCurveFSListDir(instancePtr, path); - } - - // file* - public int open(String path, int flags, int mode) throws IOException { - accessLog("open", path.toString()); - return nativeCurveFSOpen(instancePtr, path, flags, mode); - } - - public long lseek(int fd, long offset, int whence) throws IOException { - accessLog("lseek", String.valueOf(fd), String.valueOf(offset), String.valueOf(whence)); - return nativeCurveFSLSeek(instancePtr, fd, offset, whence); - } - - public int read(int fd, byte[] buf, long size, long offset) throws IOException { - accessLog("read", String.valueOf(fd), String.valueOf(size), String.valueOf(size)); - long rc = nativieCurveFSRead(instancePtr, fd, buf, size, offset); - return (int) rc; - } - - public int write(int fd, byte[] buf, long size, long offset) throws IOException { - accessLog("write", String.valueOf(fd), String.valueOf(size), String.valueOf(size)); - long rc = nativieCurveFSWrite(instancePtr, fd, buf, size, offset); - return (int) rc; - } - - public void fsync(int fd) throws IOException { - accessLog("fsync", String.valueOf(fd)); - nativeCurveFSFSync(instancePtr, fd); - } - - public void close(int fd) throws IOException { - accessLog("close", String.valueOf(fd)); - nativeCurveFSClose(instancePtr, fd); - } - - public void unlink(String path) throws IOException { - accessLog("unlink", path.toString()); - nativeCurveFSUnlink(instancePtr, path); - } - - // others - public void statfs(String path, CurveFSStatVFS statvfs) throws IOException { - accessLog("statfs", path.toString()); - nativeCurveFSStatFs(instancePtr, statvfs); - } - - public void lstat(String path, CurveFSStat stat) throws IOException { - accessLog("lstat", path.toString()); - nativeCurveFSLstat(instancePtr, path, stat); - } - - public void fstat(int fd, CurveFSStat stat) throws IOException { - accessLog("fstat", String.valueOf(fd)); - nativeCurveFSFStat(instancePtr, fd, stat); - } - - public void setattr(String path, CurveFSStat stat, int mask) throws IOException { - accessLog("setattr", path.toString()); - nativeCurveFSSetAttr(instancePtr, path, stat, mask); - } - - public void chmod(String path, int mode) throws IOException { - accessLog("chmod", path.toString()); - nativeCurveFSChmod(instancePtr, path, mode); - } - - public void chown(String path, int uid, int gid) throws IOException { - accessLog("chown", path.toString(), String.valueOf(uid), String.valueOf(gid)); - nativeCurveFSChown(instancePtr, path, uid, gid); - } - - public void rename(String src, String dst) throws IOException { - accessLog("rename", src.toString(), dst.toString()); - nativeCurveFSRename(instancePtr, src, dst); - } -} diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSStat.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSStat.java deleted file mode 100644 index d188ca93f5..0000000000 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSStat.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2023 NetEase Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Project: Curve - * Created Date: 2023-07-07 - * Author: Jingli Chen (Wine93) - */ - -package io.opencurve.curve.fs.libfs; - -public class CurveFSStat { - public boolean is_file; /* S_ISREG */ - public boolean is_directory; /* S_ISDIR */ - public boolean is_symlink; /* S_ISLNK */ - - public int mode; - public int uid; - public int gid; - public long size; - public long blksize; - public long blocks; - public long a_time; - public long m_time; - - public boolean isFile() { - return is_file; - } - - public boolean isDir() { - return is_directory; - } - - public boolean isSymlink() { - return is_symlink; - } -} diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSStatVFS.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsException.java similarity index 67% rename from curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSStatVFS.java rename to curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsException.java index 5ee2c1b8b0..c37adc7cf7 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSStatVFS.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsException.java @@ -16,18 +16,22 @@ /* * Project: Curve - * Created Date: 2023-07-07 + * Created Date: 2024-03-01 * Author: Jingli Chen (Wine93) */ package io.opencurve.curve.fs.libfs; -public class CurveFSStatVFS { - public long bsize; - public long frsize; - public long blocks; - public long bavail; - public long files; - public long fsid; - public long namemax; -} +import java.io.IOException; + +public class CurveFsException extends IOException { + public static class NotADirectoryException extends IOException { + public NotADirectoryException() { + super(); + } + + public NotADirectoryException(String s) { + super(s); + } + } +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsMount.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsMount.java new file mode 100644 index 0000000000..e9efb28b7d --- /dev/null +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsMount.java @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2024 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: Curve + * Created Date: 2024-03-01 + * Author: Jingli Chen (Wine93) + */ + +package io.opencurve.curve.fs.libfs; + +import java.net.URI; +import java.util.Map; +import java.util.UUID; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +public class CurveFsMount extends CurveFsProto { + // init + private native long nativeCurveFsNew(); + private native void nativeCurveFsDelete(long instancePtr); + private static native void nativeCurveFsConfSet(long instancePtr, String key, String value); + private static native int nativeCurveFsMount(long instancePtr, String fsname, String mountpoint); + private static native int nativeCurveFsUmount(long instancePtr, String fsname, String mountpoint); + // dir* + private static native int nativeCurveFsMkDirs(long instancePtr, String path, int mode); + private static native int nativeCurveFsRmDir(long instancePtr, String path); + private static native Dirent[] nativeCurveFsListDir(long instancePtr, String path); + // file* + private static native int nativeCurveFsCreate(long instancePtr, String path, int mode, File file); + private static native int nativeCurveFsOpen(long instancePtr, String path, int flags, File file); + private static native long nativeCurveFsLSeek(long instancePtr, int fd, long offset, int whence); + private static native int nativieCurveFsRead(long instancePtr, int fd, long offset, byte[] buffer, long size); + private static native int nativieCurveFsWrite(long instancePtr, int fd, long offset, byte[] buffer, long size); + private static native int nativeCurveFsFSync(long instancePtr, int fd); + private static native int nativeCurveFsClose(long instancePtr, int fd); + private static native int nativeCurveFsUnlink(long instancePtr, String path); + // others + private static native int nativeCurveFsStatFs(long instancePtr, StatVfs statvfs); + private static native int nativeCurveFsLStat(long instancePtr, String path, Stat stat); + private static native int nativeCurveFsFStat(long instancePtr, int fd, Stat stat); + private static native int nativeCurveFsSetAttr(long instancePtr, String path, Stat stat, int mask); + private static native int nativeCurveFsChmod(long instancePtr, String path, int mode); + private static native int nativeCurveFsChown(long instancePtr, String path, int uid, int gid); + private static native int nativeCurveFsRename(long instancePtr, String src, String dst); + private static native int nativeCurveFsRemove(long instancePtr, String path); + private static native int nativeCurveFsRemoveAll(long instancePtr, String path); + + public static class StatVfs { + public long bsize; + public long frsize; + public long blocks; + public long bavail; + public long files; + public long fsid; + public long namemax; + } + + public static class Stat { + public int mode; + public int uid; + public int gid; + public long size; + public long blksize; + public long blocks; + public long atime; + public long mtime; + + public boolean isFile; + public boolean isDirectory; + public boolean isSymlink; + } + + public static class File { + public int fd; + public long length; + }; + + public static class Dirent { + public String name; + public Stat stat; + }; + + // Flags for open(): must be synchronized with JNI if changed. + public static final int O_RDONLY = 1; + public static final int O_RDWR = 2; + public static final int O_APPEND = 4; + public static final int O_CREAT = 8; + public static final int O_TRUNC = 16; + public static final int O_EXCL = 32; + public static final int O_WRONLY = 64; + public static final int O_DIRECTORY = 128; + + // Whence flags for seek(): must be synchronized with JNI if changed. + public static final int SEEK_SET = 0; + public static final int SEEK_CUR = 1; + public static final int SEEK_END = 2; + + // Attribute flags for setattr(): must be synchronized with JNI if changed. + public static final int SETATTR_MODE = 1; + public static final int SETATTR_UID = 2; + public static final int SETATTR_GID = 4; + public static final int SETATTR_MTIME = 8; + public static final int SETATTR_ATIME = 16; + + private static final String PREFIX_KEY = "curvefs"; + public static final String REGEX_CURVEFS_CONF = "^" + PREFIX_KEY + "\\..*"; + public static final String KEY_FSNAME = "curvefs.name"; + + private static long instancePtr; + private static String fsname; + private static String mountpoint; + private static boolean initialized = false; + + static { + try { + CurveFsNativeLoader.getInstance().loadLibrary(); + } catch(Exception e) { + System.out.println("Load curvefs native library failed: " + e.getMessage()); + } + } + + public CurveFsMount() {} + + private void setConf(Configuration conf) throws IOException { + Map m = conf.getValByRegex(REGEX_CURVEFS_CONF); + for (Map.Entry entry : m.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.equals(KEY_FSNAME)) { + fsname = value; + continue; + } + nativeCurveFsConfSet(instancePtr, key.substring(PREFIX_KEY.length() + 1), value); + } + + if (null == fsname || fsname.isEmpty()) { + throw new IOException(KEY_FSNAME + " is not set"); + } + mountpoint = UUID.randomUUID().toString(); + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + if (initialized) { + return; + } + instancePtr = nativeCurveFsNew(); + setConf(conf); + nativeCurveFsMount(instancePtr, fsname, mountpoint); + initialized = true; + } + + @Override + public void shutdown() throws IOException { + if (!initialized) { + return; + } + nativeCurveFsUmount(instancePtr, fsname, mountpoint); + nativeCurveFsDelete(instancePtr); + initialized = false; + } + + @Override + public void mkdirs(String path, int mode) throws IOException { + nativeCurveFsMkDirs(instancePtr, path, mode); + } + + @Override + public void rmdir(String path) throws IOException { + nativeCurveFsRmDir(instancePtr, path); + } + + @Override + public Dirent[] listdir(String path) throws IOException { + return nativeCurveFsListDir(instancePtr, path); + } + + @Override + public void create(String path, int mode, File file) throws IOException { + nativeCurveFsCreate(instancePtr, path, mode, file); + } + + public void open(String path, int flags, File file) throws IOException { + nativeCurveFsOpen(instancePtr, path, flags, file); + } + + @Override + public long lseek(int fd, long offset, int whence) throws IOException { + return nativeCurveFsLSeek(instancePtr, fd, offset, whence); + } + + @Override + public int read(int fd, long offset, byte[] buffer, long size) throws IOException { + return nativieCurveFsRead(instancePtr, fd, offset, buffer, size); + } + + @Override + public int write(int fd, long offset, byte[] buffer, long size) throws IOException { + return nativieCurveFsWrite(instancePtr, fd, offset, buffer, size); + } + + @Override + public void fsync(int fd) throws IOException { + nativeCurveFsFSync(instancePtr, fd); + } + + @Override + public void close(int fd) throws IOException { + nativeCurveFsClose(instancePtr, fd); + } + + @Override + public void unlink(String path) throws IOException { + nativeCurveFsUnlink(instancePtr, path); + } + + @Override + public void statfs(String path, StatVfs statvfs) throws IOException { + nativeCurveFsStatFs(instancePtr, statvfs); + } + + @Override + public void lstat(String path, Stat stat) throws IOException { + nativeCurveFsLStat(instancePtr, path, stat); + } + + @Override + public void fstat(int fd, Stat stat) throws IOException { + nativeCurveFsFStat(instancePtr, fd, stat); + } + + @Override + public void setattr(String path, Stat stat, int mask) throws IOException { + nativeCurveFsSetAttr(instancePtr, path, stat, mask); + } + + @Override + public void chmod(String path, int mode) throws IOException { + nativeCurveFsChmod(instancePtr, path, mode); + } + + @Override + public void chown(String path, int uid, int gid) throws IOException { + nativeCurveFsChown(instancePtr, path, uid, gid); + } + + @Override + public void rename(String src, String dst) throws IOException { + nativeCurveFsRename(instancePtr, src, dst); + } + + @Override + public void remove(String path) throws IOException { + nativeCurveFsRemove(instancePtr, path); + } + + @Override + public void removeall(String path) throws IOException { + nativeCurveFsRemoveAll(instancePtr, path); + } +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSNativeLoader.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsNativeLoader.java similarity index 86% rename from curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSNativeLoader.java rename to curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsNativeLoader.java index 7ba7a535e8..95df921e51 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFSNativeLoader.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsNativeLoader.java @@ -13,60 +13,53 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - /* * Project: Curve * Created Date: 2023-07-07 * Author: Jingli Chen (Wine93) */ - package io.opencurve.curve.fs.libfs; - import java.net.URL; import java.net.URLConnection; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; - import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.utils.IOUtils; -public class CurveFSNativeLoader { +public class CurveFsNativeLoader { boolean initialized = false; - private static final CurveFSNativeLoader instance = new CurveFSNativeLoader(); + private static final CurveFsNativeLoader instance = new CurveFsNativeLoader(); private static final String TMP_DIR = "/tmp"; private static final String CURVEFS_LIBRARY_PATH = "/tmp/libcurvefs"; private static final String RESOURCE_TAR_NAME = "libcurvefs.tar"; private static final String JNI_LIBRARY_NAME = "libcurvefs_jni.so"; - private CurveFSNativeLoader() {} + private CurveFsNativeLoader() {} - public static CurveFSNativeLoader getInstance() { + public static CurveFsNativeLoader getInstance() { return instance; } public long getJarModifiedTime() throws IOException { - URL location = CurveFSNativeLoader.class.getProtectionDomain().getCodeSource().getLocation(); + URL location = CurveFsNativeLoader.class.getProtectionDomain().getCodeSource().getLocation(); URLConnection conn = location.openConnection(); return conn.getLastModified(); } - public void descompress(InputStream in, String dest) throws IOException { File dir = new File(dest); if (!dir.exists()) { dir.mkdirs(); } - ArchiveEntry entry; TarArchiveInputStream reader = new TarArchiveInputStream(in); while ((entry = reader.getNextTarEntry()) != null) { if (entry.isDirectory()) { continue; } - String path = TMP_DIR + File.separator + entry.getName(); File file = new File(path); IOUtils.copy(reader, new FileOutputStream(file)); @@ -74,25 +67,23 @@ public void descompress(InputStream in, String dest) throws IOException { reader.close(); } - public void loadJniLibrary() throws IOException { + public void loadJNILibrary() throws IOException { File libFile = new File(CURVEFS_LIBRARY_PATH, JNI_LIBRARY_NAME); System.load(libFile.getAbsolutePath()); } - public synchronized void loadLibrary() throws IOException { if (initialized) { return; } - long jarModifiedTime = getJarModifiedTime(); File libDir = new File(CURVEFS_LIBRARY_PATH); if (libDir.exists() && libDir.lastModified() == jarModifiedTime) { - loadJniLibrary(); + loadJNILibrary(); initialized = true; return; } - InputStream reader = CurveFSNativeLoader.class.getResourceAsStream("/" + RESOURCE_TAR_NAME); + InputStream reader = CurveFsNativeLoader.class.getResourceAsStream("/" + RESOURCE_TAR_NAME); if (reader == null) { throw new IOException("Cannot get resource " + RESOURCE_TAR_NAME + " from Jar file."); } @@ -100,7 +91,7 @@ public synchronized void loadLibrary() throws IOException { reader.close(); libDir.setLastModified(jarModifiedTime); - loadJniLibrary(); + loadJNILibrary(); initialized = true; } -} +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsProto.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsProto.java new file mode 100644 index 0000000000..427de2d811 --- /dev/null +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/libfs/CurveFsProto.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2024 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: Curve + * Created Date: 2024-03-01 + * Author: Jingli Chen (Wine93) + */ + +package io.opencurve.curve.fs.libfs; + +import java.net.URI; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +import io.opencurve.curve.fs.libfs.CurveFsMount.StatVfs; +import io.opencurve.curve.fs.libfs.CurveFsMount.Stat; +import io.opencurve.curve.fs.libfs.CurveFsMount.File; +import io.opencurve.curve.fs.libfs.CurveFsMount.Dirent; + +public abstract class CurveFsProto { + public abstract void initialize(URI uri, Configuration conf) throws IOException; + public abstract void shutdown() throws IOException; + + public abstract void mkdirs(String path, int mode) throws IOException; + public abstract void rmdir(String path) throws IOException; + public abstract Dirent[] listdir(String path) throws IOException; + + public abstract void create(String path, int mode, File file) throws IOException; + public abstract void open(String path, int flags, File file) throws IOException; + public abstract long lseek(int fd, long offset, int whence) throws IOException; + public abstract int write(int fd, long offset, byte[] buffer, long size) throws IOException; + public abstract int read(int fd, long offset, byte[] buffer, long size) throws IOException; + public abstract void fsync(int fd) throws IOException; + public abstract void close(int fd) throws IOException; + public abstract void unlink(String path) throws IOException; + + public abstract void statfs(String path, StatVfs statvfs) throws IOException; + public abstract void lstat(String path, Stat stat) throws IOException; + public abstract void fstat(int fd, Stat stat) throws IOException; + public abstract void setattr(String path, Stat stat, int mask) throws IOException; + public abstract void chmod(String path, int mode) throws IOException; + public abstract void chown(String path, int uid, int gid) throws IOException; + public abstract void remove(String path) throws IOException; + public abstract void removeall(String path) throws IOException; + public abstract void rename(String src, String dst) throws IOException; +}