diff --git a/curvefs/sdk/java/pom.xml b/curvefs/sdk/java/pom.xml index ecf8be0043..a3ed98aea9 100644 --- a/curvefs/sdk/java/pom.xml +++ b/curvefs/sdk/java/pom.xml @@ -34,6 +34,21 @@ src/main/resources + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java index ac42dfaf82..2a79fcc1c4 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.opencurve.curve.fs.flink; import io.opencurve.curve.fs.hadoop.CurveFileSystem; @@ -10,28 +26,17 @@ import java.net.URI; public class CurveFileSystemFactory implements FileSystemFactory { - private org.apache.hadoop.conf.Configuration conf; - + private org.apache.hadoop.conf.Configuration conf = new Configuration(); private static final String CURVE_FS_CONFIG_PREFIXES = "curvefs."; private static final String FLINK_CONFIG_PREFIXES = "fs."; public static String SCHEME = "curvefs"; @Override public void configure(org.apache.flink.configuration.Configuration config) { - conf = new Configuration(); - if (config != null) { - for (String key : config.keySet()) { - if (key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES)) { - String value = config.getString(key, null); - if (value != null) { - if (CurveFileSystem.class.getCanonicalName().equals(value.trim())) { - SCHEME = key.split("\\.")[1]; - } - conf.set(key, value); - } - } - } - } + config.keySet() + .stream() + .filter(key -> key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES)) + .forEach(key -> conf.set(key, config.getString(key, ""))); } @Override @@ -45,4 +50,4 @@ public FileSystem create(URI uri) throws IOException { fs.initialize(uri, conf); return new HadoopFileSystem(fs); } -} +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java index da68151bbd..d065492109 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.opencurve.curve.fs.flink; import org.apache.flink.connector.file.table.FileSystemTableFactory; diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java index 2dd1be8d1d..25ad56564f 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java @@ -59,8 +59,6 @@ public class CurveFSInputStream extends FSInputStream { */ public CurveFSInputStream(Configuration conf, CurveFSProto curvefs, int fh, long flength, int bufferSize) { - // Whoever's calling the constructor is responsible for doing the actual curve_open - // call and providing the file handle. fileLength = flength; fileHandle = fh; closed = false; @@ -73,6 +71,7 @@ public CurveFSInputStream(Configuration conf, CurveFSProto curvefs, /** Curve likes things to be closed before it shuts down, * so closing the IOStream stuff voluntarily in a finalizer is good */ + @Override protected void finalize() throws Throwable { try { if (!closed) { @@ -91,7 +90,6 @@ private synchronized boolean fillBuffer() throws IOException { bufValid = 0; - // attempt to reset to old position. If it fails, too bad. curve.lseek(fileHandle, curvePos, CurveFSMount.SEEK_SET); throw new IOException("Failed to fill read buffer! Error code:" + err); } @@ -102,6 +100,7 @@ private synchronized boolean fillBuffer() throws IOException { /* * Get the current position of the stream. */ + @Override public synchronized long getPos() throws IOException { return curvePos - bufValid + bufPos; } @@ -117,6 +116,7 @@ public synchronized int available() throws IOException { return (int) (fileLength - getPos()); } + @Override public synchronized void seek(long targetPos) throws IOException { LOG.trace("CurveInputStream.seek: Seeking to position " + targetPos + " on fd " + fileHandle); @@ -142,6 +142,7 @@ public synchronized void seek(long targetPos) throws IOException { * they'll be dealt with before anybody even tries to call this method! * @return false. */ + @Override public synchronized boolean seekToNewSource(long targetPos) { return false; } diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java index 90366cc210..26d0492142 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java @@ -29,6 +29,7 @@ import io.opencurve.curve.fs.libfs.CurveFSStat; import io.opencurve.curve.fs.libfs.CurveFSStatVFS; +import java.util.UUID; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; @@ -37,6 +38,7 @@ class CurveFSTalker extends CurveFSProto { private CurveFSMount mount; private String fsname = null; + private String mountpoint = null; private boolean inited = false; private static final String PREFIX_KEY = "curvefs"; @@ -72,14 +74,15 @@ void initialize(URI uri, Configuration conf) throws IOException { if (null == fsname || fsname.isEmpty()) { throw new IOException("curvefs.name is not set"); } - mount.mount(fsname, "/"); + mountpoint = UUID.randomUUID().toString(); + mount.mount(fsname, mountpoint); inited = true; } @Override void shutdown() throws IOException { if (inited) { - mount.umount(fsname, "/"); + mount.umount(fsname, mountpoint); mount = null; inited = false; } @@ -179,4 +182,4 @@ void chown(Path path, int uid, int gid) throws IOException { void rename(Path src, Path dst) throws IOException { mount.rename(tostr(src), tostr(dst)); } -} +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java index 09df042c1b..fc031d38d8 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java @@ -59,12 +59,14 @@ private Path makeAbsolute(Path path) { return new Path(workingDir, path); } + @Override public URI getUri() { return uri; } + @Override public String getScheme() { - return uri.getScheme(); + return "hdfs"; } @Override @@ -85,14 +87,12 @@ public void initialize(URI uri, Configuration conf) throws IOException { this.workingDir = getHomeDirectory(); } - + @Override public FSDataInputStream open(Path path, int bufferSize) throws IOException { path = makeAbsolute(path); - // throws filenotfoundexception if path is a directory int fd = curve.open(path, CurveFSMount.O_RDONLY, 0); - /* get file size */ CurveFSStat stat = new CurveFSStat(); curve.fstat(fd, stat); @@ -102,10 +102,11 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { @Override public void close() throws IOException { - super.close(); // this method does stuff, make sure it's run! + super.close(); curve.shutdown(); } + @Override public FSDataOutputStream append(Path path, int bufferSize, Progressable progress) throws IOException { path = makeAbsolute(path); @@ -122,6 +123,7 @@ public FSDataOutputStream append(Path path, int bufferSize, Progressable progres return new FSDataOutputStream(ostream, statistics); } + @Override public Path getWorkingDirectory() { return workingDir; } @@ -144,6 +146,7 @@ public boolean mkdirs(Path f) throws IOException { return mkdirs(f, perms); } + @Override public FileStatus getFileStatus(Path path) throws IOException { path = makeAbsolute(path); @@ -160,7 +163,7 @@ public FileStatus getFileStatus(Path path) throws IOException { return status; } - + @Override public FileStatus[] listStatus(Path path) throws IOException { path = makeAbsolute(path); @@ -174,12 +177,10 @@ public FileStatus[] listStatus(Path path) throws IOException { for (int i = 0; i < status.length; i++) { status[i] = getFileStatus(new Path(path, dirlist[i])); } - curve.shutdown(); return status; } else { throw new FileNotFoundException("File " + path + " does not exist."); } - } @Override @@ -208,9 +209,9 @@ public void setTimes(Path path, long mtime, long atime) throws IOException { curve.setattr(path, stat, mask); } + @Override public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - path = makeAbsolute(path); boolean exists = exists(path); @@ -268,6 +269,7 @@ public void setOwner(Path path, String username, String groupname) throws IOExce } @Deprecated + @Override public FSDataOutputStream createNonRecursive(Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, @@ -278,7 +280,7 @@ public FSDataOutputStream createNonRecursive(Path path, FsPermission permission, if (parent != null) { CurveFSStat stat = new CurveFSStat(); - curve.lstat(parent, stat); // handles FileNotFoundException case + curve.lstat(parent, stat); if (stat.isFile()) { throw new FileAlreadyExistsException(parent.toString()); } @@ -314,14 +316,15 @@ public boolean rename(Path src, Path dst) throws IOException { } @Deprecated + @Override public boolean delete(Path path) throws IOException { return delete(path, false); } + @Override public boolean delete(Path path, boolean recursive) throws IOException { path = makeAbsolute(path); - /* path exists? */ FileStatus status; try { status = getFileStatus(path); @@ -329,13 +332,11 @@ public boolean delete(Path path, boolean recursive) throws IOException { return false; } - /* we're done if its a file */ if (status.isFile()) { curve.unlink(path); return true; } - /* get directory contents */ FileStatus[] dirlist = listStatus(path); if (dirlist == null) { return false; @@ -383,6 +384,6 @@ protected int getDefaultPort() { @Override public String getCanonicalServiceName() { - return null; // Does not support Token + return null; } } diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java deleted file mode 100644 index d488e309dc..0000000000 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.opencurve.curve.fs.hadoop; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} diff --git a/curvefs/sdk/libcurvefs/libcurvefs.cpp b/curvefs/sdk/libcurvefs/libcurvefs.cpp index d53c0b51b8..fb54c3b509 100644 --- a/curvefs/sdk/libcurvefs/libcurvefs.cpp +++ b/curvefs/sdk/libcurvefs/libcurvefs.cpp @@ -143,7 +143,7 @@ int curvefs_open(uintptr_t instance_ptr, } } - uint64_t fd; + uint64_t fd = 0; rc = mount->vfs->Open(path, flags, mode, &fd); if (rc != CURVEFS_ERROR::OK) { return SysErr(rc); @@ -164,7 +164,7 @@ ssize_t curvefs_read(uintptr_t instance_ptr, int fd, char* buffer, size_t count) { - size_t nread; + size_t nread = 0; auto mount = get_instance(instance_ptr); auto rc = mount->vfs->Read(fd, buffer, count, &nread); if (rc != CURVEFS_ERROR::OK) {