Skip to content

Commit

Permalink
improve(filesystem-hadoop): support path without scheme for gvfs api (a…
Browse files Browse the repository at this point in the history
…pache#2779)

### What changes were proposed in this pull request?

It will use the path without scheme in tensorflow. This MR will support
the path without gvfs scheme.

https://github.com/tensorflow/io/blob/master/tensorflow_io/core/filesystems/hdfs/hadoop_filesystem.cc#L618

https://github.com/tensorflow/io/blob/master/tensorflow_io/core/filesystems/hdfs/hadoop_filesystem.cc#L116

### Why are the changes needed?

-  support path without Scheme for Hadoop API
- apache#2860

### Does this PR introduce _any_ user-facing change?

- no

### How was this patch tested?

- UTs pass
  • Loading branch information
coolderli authored Apr 15, 2024
1 parent 8fd412c commit 802e539
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
Expand All @@ -51,6 +52,13 @@ public class GravitinoVirtualFileSystem extends FileSystem {
private Cache<NameIdentifier, Pair<Fileset, FileSystem>> filesetCache;
private ScheduledThreadPoolExecutor scheduler;

// The pattern is used to match gvfs path. The scheme prefix (gvfs://fileset) is optional.
// The following path can be match:
// gvfs://fileset/fileset_catalog/fileset_schema/fileset1/file.txt
// /fileset_catalog/fileset_schema/fileset1/sub_dir/
private static final Pattern IDENTIFIER_PATTERN =
Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?:[/[^/]+]*)$");

@Override
public void initialize(URI name, Configuration configuration) throws IOException {
if (!name.toString().startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)) {
Expand Down Expand Up @@ -214,39 +222,33 @@ private void checkAuthConfig(String authType, String configKey, String configVal
authType);
}

private String concatVirtualPrefix(NameIdentifier identifier) {
return GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX
+ identifier.namespace().level(1)
+ "/"
+ identifier.namespace().level(2)
+ "/"
+ identifier.name();
private String getVirtualLocation(NameIdentifier identifier, boolean withScheme) {
return String.format(
"%s/%s/%s/%s",
withScheme ? GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX : "",
identifier.namespace().level(1),
identifier.namespace().level(2),
identifier.name());
}

private Path getActualPathByIdentifier(
NameIdentifier identifier, Pair<Fileset, FileSystem> filesetPair, Path path) {
String virtualPath = path.toString();
if (!virtualPath.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)) {
throw new InvalidPathException(
String.format(
"Path %s doesn't start with the scheme \"%s\".",
virtualPath, GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX));
}
boolean withScheme =
virtualPath.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX);
String virtualLocation = getVirtualLocation(identifier, withScheme);
String storageLocation = filesetPair.getLeft().storageLocation();
try {
if (checkMountsSingleFile(filesetPair)) {
String virtualPrefix = concatVirtualPrefix(identifier);
Preconditions.checkArgument(
virtualPath.equals(virtualPrefix),
virtualPath.equals(virtualLocation),
"Path: %s should be same with the virtual prefix: %s, because the fileset only mounts a single file.",
virtualPath,
virtualPrefix);
virtualLocation);

return new Path(filesetPair.getLeft().storageLocation());
return new Path(storageLocation);
} else {
return new Path(
virtualPath.replaceFirst(
concatVirtualPrefix(identifier),
new Path(filesetPair.getLeft().storageLocation()).toString()));
return new Path(virtualPath.replaceFirst(virtualLocation, storageLocation));
}
} catch (Exception e) {
throw new RuntimeException(
Expand Down Expand Up @@ -275,37 +277,32 @@ private boolean checkMountsSingleFile(Pair<Fileset, FileSystem> filesetPair) {
private FileStatus convertFileStatusPathPrefix(
FileStatus fileStatus, String actualPrefix, String virtualPrefix) {
String filePath = fileStatus.getPath().toString();
if (!filePath.startsWith(actualPrefix)) {
throw new InvalidPathException(
String.format("Path %s doesn't start with prefix \"%s\".", filePath, actualPrefix));
}
Preconditions.checkArgument(
filePath.startsWith(actualPrefix),
"Path %s doesn't start with prefix \"%s\".",
filePath,
actualPrefix);
Path path = new Path(filePath.replaceFirst(actualPrefix, virtualPrefix));
fileStatus.setPath(path);

return fileStatus;
}

private NameIdentifier extractIdentifier(URI virtualUri) {
@VisibleForTesting
NameIdentifier extractIdentifier(URI virtualUri) {
String virtualPath = virtualUri.toString();
Preconditions.checkArgument(
virtualUri
.toString()
.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX),
"Path %s doesn't start with scheme prefix \"%s\".",
virtualUri,
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX);

if (StringUtils.isBlank(virtualUri.toString())) {
throw new InvalidPathException("Uri which need be extracted cannot be null or empty.");
}
StringUtils.isNotBlank(virtualPath),
"Uri which need be extracted cannot be null or empty.");

// remove first '/' symbol with empty string
String[] reservedDirs =
Arrays.stream(virtualUri.getPath().replaceFirst("/", "").split("/")).toArray(String[]::new);
Matcher matcher = IDENTIFIER_PATTERN.matcher(virtualPath);
Preconditions.checkArgument(
reservedDirs.length >= 3, "URI %s doesn't contains valid identifier", virtualUri);
matcher.matches() && matcher.groupCount() == 3,
"URI %s doesn't contains valid identifier",
virtualPath);

return NameIdentifier.ofFileset(
metalakeName, reservedDirs[0], reservedDirs[1], reservedDirs[2]);
metalakeName, matcher.group(1), matcher.group(2), matcher.group(3));
}

private FilesetContext getFilesetContext(Path virtualPath) {
Expand Down Expand Up @@ -449,7 +446,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
return convertFileStatusPathPrefix(
fileStatus,
context.getFileset().storageLocation(),
concatVirtualPrefix(context.getIdentifier()));
getVirtualLocation(context.getIdentifier(), true));
}

@Override
Expand All @@ -462,7 +459,7 @@ public FileStatus[] listStatus(Path path) throws IOException {
convertFileStatusPathPrefix(
fileStatus,
new Path(context.getFileset().storageLocation()).toString(),
concatVirtualPrefix(context.getIdentifier())))
getVirtualLocation(context.getIdentifier(), true)))
.toArray(FileStatus[]::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

/** Configuration class for Gravitino Virtual File System. */
class GravitinoVirtualFileSystemConfiguration {
public static final String GVFS_FILESET_PREFIX = "gvfs://fileset/";
public static final String GVFS_FILESET_PREFIX = "gvfs://fileset";
public static final String GVFS_SCHEME = "gvfs";

/** The configuration key for the Gravitino server URI. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

public class FileSystemTestUtils {
private static final String LOCAL_FS_PREFIX =
"file:/tmp/gravitino_test_fs_" + UUID.randomUUID().toString().replace("-", "") + "/";
"file:/tmp/gravitino_test_fs_" + UUID.randomUUID().toString().replace("-", "");

private static final int BUFFER_SIZE = 3;
private static final short REPLICATION = 1;
Expand All @@ -30,23 +30,24 @@ public static String localRootPrefix() {
return LOCAL_FS_PREFIX;
}

public static Path createFilesetPath(String filesetCatalog, String schema, String fileset) {
return new Path(
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX
+ "/"
+ filesetCatalog
+ "/"
+ schema
+ "/"
+ fileset);
public static Path createFilesetPath(
String filesetCatalog, String schema, String fileset, boolean withScheme) {
String filesetPath =
String.format(
"%s/%s/%s/%s",
withScheme ? GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX : "",
filesetCatalog,
schema,
fileset);
return new Path(filesetPath);
}

public static Path createLocalRootDir(String filesetCatalog) {
return new Path(LOCAL_FS_PREFIX + filesetCatalog);
return new Path(String.format("%s/%s", LOCAL_FS_PREFIX, filesetCatalog));
}

public static Path createLocalDirPrefix(String filesetCatalog, String schema, String fileset) {
return new Path(LOCAL_FS_PREFIX + filesetCatalog + "/" + schema + "/" + fileset);
return new Path(String.format("%s/%s/%s/%s", LOCAL_FS_PREFIX, filesetCatalog, schema, fileset));
}

public static void create(Path path, FileSystem fileSystem) throws IOException {
Expand Down
Loading

0 comments on commit 802e539

Please sign in to comment.