diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java index 1946464911f..da4d0e1a18e 100644 --- a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import java.io.FileNotFoundException; import java.io.IOException; import java.time.Instant; import java.util.Collections; @@ -40,12 +41,16 @@ import org.apache.gravitino.Schema; import org.apache.gravitino.SchemaChange; import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.audit.CallerContext; +import org.apache.gravitino.audit.FilesetAuditConstants; +import org.apache.gravitino.audit.FilesetDataOperation; import org.apache.gravitino.connector.CatalogInfo; import org.apache.gravitino.connector.CatalogOperations; import org.apache.gravitino.connector.HasPropertyMetadata; import org.apache.gravitino.connector.SupportsSchemas; import org.apache.gravitino.exceptions.AlreadyExistsException; import org.apache.gravitino.exceptions.FilesetAlreadyExistsException; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; import org.apache.gravitino.exceptions.NoSuchCatalogException; import org.apache.gravitino.exceptions.NoSuchEntityException; import org.apache.gravitino.exceptions.NoSuchFilesetException; @@ -358,7 +363,6 @@ public boolean dropFileset(NameIdentifier ident) { @Override public String getFileLocation(NameIdentifier ident, String subPath) throws NoSuchFilesetException { - // TODO we need move some check logics in the Hadoop / Python GVFS to here. Preconditions.checkArgument(subPath != null, "subPath must not be null"); String processedSubPath; if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) { @@ -369,11 +373,56 @@ public String getFileLocation(NameIdentifier ident, String subPath) Fileset fileset = loadFileset(ident); + boolean isSingleFile = checkSingleFile(fileset); + // if the storage location is a single file, it cannot have sub path to access. + if (isSingleFile && StringUtils.isBlank(processedSubPath)) { + throw new GravitinoRuntimeException( + "Sub path should always be blank, because the fileset only mounts a single file."); + } + + // do checks for some data operations. + if (hasCallerContext()) { + Map contextMap = CallerContext.CallerContextHolder.get().context(); + String operation = + contextMap.getOrDefault( + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, + FilesetDataOperation.UNKNOWN.name()); + if (!FilesetDataOperation.checkValid(operation)) { + LOG.warn( + "The data operation: {} is not valid, we cannot do some checks for this operation.", + operation); + } else { + FilesetDataOperation dataOperation = FilesetDataOperation.valueOf(operation); + switch (dataOperation) { + case RENAME: + // Fileset only mounts a single file, the storage location of the fileset cannot be + // renamed; Otherwise the metadata in the Gravitino server may be inconsistent. + if (isSingleFile) { + throw new GravitinoRuntimeException( + "Cannot rename the fileset: %s which only mounts to a single file.", ident); + } + // if the sub path is blank, it cannot be renamed, + // otherwise the metadata in the Gravitino server may be inconsistent. + if (StringUtils.isBlank(processedSubPath) + || (processedSubPath.startsWith(SLASH) && processedSubPath.length() == 1)) { + throw new GravitinoRuntimeException( + "subPath cannot be blank when need to rename a file or a directory."); + } + break; + default: + break; + } + } + } + String fileLocation; - // subPath cannot be null, so we only need check if it is blank - if (StringUtils.isBlank(processedSubPath)) { + // 1. if the storage location is a single file, we pass the storage location directly + // 2. if the processed sub path is blank, we pass the storage location directly + if (isSingleFile || StringUtils.isBlank(processedSubPath)) { fileLocation = fileset.storageLocation(); } else { + // the processed sub path always starts with "/" if it is not blank, + // so we can safely remove the tailing slash if storage location ends with "/". String storageLocation = fileset.storageLocation().endsWith(SLASH) ? fileset.storageLocation().substring(0, fileset.storageLocation().length() - 1) @@ -672,4 +721,25 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep FileSystem defaultFs = FileSystem.get(configuration); return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory()); } + + private boolean hasCallerContext() { + return CallerContext.CallerContextHolder.get() != null + && CallerContext.CallerContextHolder.get().context() != null + && !CallerContext.CallerContextHolder.get().context().isEmpty(); + } + + private boolean checkSingleFile(Fileset fileset) { + try { + Path locationPath = new Path(fileset.storageLocation()); + return locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile(); + } catch (FileNotFoundException e) { + // We should always return false here, same with the logic in `FileSystem.isFile(Path f)`. + return false; + } catch (IOException e) { + throw new GravitinoRuntimeException( + e, + "Exception occurs when checking whether fileset: %s mounts a single file", + fileset.name()); + } + } } diff --git a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java index 9f08ed8b685..d3206972680 100644 --- a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java @@ -59,9 +59,13 @@ import org.apache.gravitino.Schema; import org.apache.gravitino.SchemaChange; import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.audit.CallerContext; +import org.apache.gravitino.audit.FilesetAuditConstants; +import org.apache.gravitino.audit.FilesetDataOperation; import org.apache.gravitino.connector.CatalogInfo; import org.apache.gravitino.connector.HasPropertyMetadata; import org.apache.gravitino.connector.PropertiesMetadata; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; import org.apache.gravitino.exceptions.NoSuchFilesetException; import org.apache.gravitino.exceptions.NoSuchSchemaException; import org.apache.gravitino.exceptions.NonEmptySchemaException; @@ -800,6 +804,85 @@ public void testGetFileLocation() throws IOException { String fileLocation3 = ops.getFileLocation(filesetIdent, subPath4); Assertions.assertEquals(fileset.storageLocation(), fileLocation3); } + + // test mount a single file + String filesetName2 = "test_get_file_location_2"; + String filesetLocation2 = + TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName2; + Path filesetLocationPath2 = new Path(filesetLocation2); + createFileset(filesetName2, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation2); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store); + FileSystem localFileSystem = filesetLocationPath2.getFileSystem(new Configuration())) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); + NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName2); + // replace fileset location to a single file + Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2)); + Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isDirectory()); + localFileSystem.delete(filesetLocationPath2, true); + localFileSystem.create(filesetLocationPath2); + Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2)); + Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isFile()); + + String subPath = "/year=2024/month=07/day=22/test.parquet"; + Map contextMap = Maps.newHashMap(); + contextMap.put( + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, + FilesetDataOperation.RENAME.name()); + CallerContext callerContext = CallerContext.builder().withContext(contextMap).build(); + CallerContext.CallerContextHolder.set(callerContext); + + Assertions.assertThrows( + GravitinoRuntimeException.class, () -> ops.getFileLocation(filesetIdent, subPath)); + } finally { + CallerContext.CallerContextHolder.remove(); + } + + // test rename with an empty subPath + String filesetName3 = "test_get_file_location_3"; + String filesetLocation3 = + TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName3; + Path filesetLocationPath3 = new Path(filesetLocation3); + createFileset(filesetName3, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation3); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store); + FileSystem localFileSystem = filesetLocationPath3.getFileSystem(new Configuration())) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); + NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName3); + // replace fileset location to a single file + Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3)); + Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isDirectory()); + localFileSystem.delete(filesetLocationPath3, true); + localFileSystem.create(filesetLocationPath3); + Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3)); + Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isFile()); + + Map contextMap = Maps.newHashMap(); + contextMap.put( + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, + FilesetDataOperation.RENAME.name()); + CallerContext callerContext = CallerContext.builder().withContext(contextMap).build(); + CallerContext.CallerContextHolder.set(callerContext); + Assertions.assertThrows( + GravitinoRuntimeException.class, () -> ops.getFileLocation(filesetIdent, "")); + } + + // test storage location end with "/" + String filesetName4 = "test_get_file_location_4"; + String filesetLocation4 = + TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName4 + "/"; + NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); + Fileset mockFileset = Mockito.mock(Fileset.class); + when(mockFileset.name()).thenReturn(filesetName4); + when(mockFileset.storageLocation()).thenReturn(filesetLocation4); + + try (HadoopCatalogOperations mockOps = Mockito.mock(HadoopCatalogOperations.class)) { + mockOps.hadoopConf = new Configuration(); + when(mockOps.loadFileset(filesetIdent)).thenReturn(mockFileset); + String subPath = "/test/test.parquet"; + when(mockOps.getFileLocation(filesetIdent, subPath)).thenCallRealMethod(); + String fileLocation = mockOps.getFileLocation(filesetIdent, subPath); + Assertions.assertEquals( + String.format("%s%s", mockFileset.storageLocation(), subPath.substring(1)), fileLocation); + } } private static Stream locationArguments() { diff --git a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java index aa4284eee36..20f9a1eeab8 100644 --- a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java +++ b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java @@ -664,16 +664,25 @@ public void testGetFileLocationWithInvalidAuditHeaders() { try { String filesetName = GravitinoITUtils.genRandomName("fileset"); NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName); + Assertions.assertFalse(catalog.asFilesetCatalog().filesetExists(filesetIdent)); + Fileset expectedFileset = + catalog + .asFilesetCatalog() + .createFileset( + filesetIdent, + "fileset comment", + Fileset.Type.MANAGED, + generateLocation(filesetName), + Maps.newHashMap()); Map context = new HashMap<>(); - // this is an invalid internal client type. + // this is an invalid internal client type, but the server will return normally context.put(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, "test"); CallerContext callerContext = CallerContext.builder().withContext(context).build(); CallerContext.CallerContextHolder.set(callerContext); - Assertions.assertThrows( - IllegalArgumentException.class, - () -> catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test.par")); + String fileLocation = catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test.par"); + Assertions.assertEquals(expectedFileset.storageLocation() + "/test.par", fileLocation); } finally { CallerContext.CallerContextHolder.remove(); } diff --git a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetContext.java b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetContext.java deleted file mode 100644 index b56f39cd6ca..00000000000 --- a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetContext.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.filesystem.hadoop; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import org.apache.gravitino.NameIdentifier; -import org.apache.gravitino.file.Fileset; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -/** - * A context object that holds the information about the fileset and the file system which used in - * the {@link GravitinoVirtualFileSystem}'s operations. - */ -class FilesetContext { - private NameIdentifier identifier; - private Fileset fileset; - private FileSystem fileSystem; - private Path actualPath; - - private FilesetContext() {} - - public NameIdentifier getIdentifier() { - return identifier; - } - - public Fileset getFileset() { - return fileset; - } - - public FileSystem getFileSystem() { - return fileSystem; - } - - public Path getActualPath() { - return actualPath; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof FilesetContext)) return false; - FilesetContext that = (FilesetContext) o; - return Objects.equal(getIdentifier(), that.getIdentifier()) - && Objects.equal(getFileset(), that.getFileset()) - && Objects.equal(getFileSystem(), that.getFileSystem()) - && Objects.equal(getActualPath(), that.getActualPath()); - } - - @Override - public int hashCode() { - return Objects.hashCode(getIdentifier(), getFileset(), getFileSystem(), getActualPath()); - } - - public static Builder builder() { - return new Builder(); - } - - /** A builder class for {@link FilesetContext}. */ - public static class Builder { - - private final FilesetContext context; - - private Builder() { - this.context = new FilesetContext(); - } - - public Builder withIdentifier(NameIdentifier identifier) { - context.identifier = identifier; - return this; - } - - public Builder withFileSystem(FileSystem fileSystem) { - context.fileSystem = fileSystem; - return this; - } - - public Builder withFileset(Fileset fileset) { - context.fileset = fileset; - return this; - } - - public Builder withActualPath(Path actualPath) { - context.actualPath = actualPath; - return this; - } - - public FilesetContext build() { - Preconditions.checkArgument(context.identifier != null, "Identifier is required"); - Preconditions.checkArgument(context.fileset != null, "Fileset is required"); - Preconditions.checkArgument(context.fileSystem != null, "FileSystem is required"); - Preconditions.checkArgument(context.actualPath != null, "ActualPath is required"); - return context; - } - } -} diff --git a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java index 7ebeb441b7b..de0eb758edc 100644 --- a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java +++ b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java @@ -23,25 +23,29 @@ import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.Arrays; +import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.audit.CallerContext; +import org.apache.gravitino.audit.FilesetAuditConstants; +import org.apache.gravitino.audit.FilesetDataOperation; +import org.apache.gravitino.audit.InternalClientType; import org.apache.gravitino.client.DefaultOAuth2TokenProvider; import org.apache.gravitino.client.GravitinoClient; import org.apache.gravitino.client.KerberosTokenProvider; -import org.apache.gravitino.file.Fileset; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; +import org.apache.gravitino.file.FilesetCatalog; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -65,8 +69,10 @@ public class GravitinoVirtualFileSystem extends FileSystem { private URI uri; private GravitinoClient client; private String metalakeName; - private Cache> filesetCache; - private ScheduledThreadPoolExecutor scheduler; + private Cache catalogCache; + private ScheduledThreadPoolExecutor catalogCleanScheduler; + private Cache internalFileSystemCache; + private ScheduledThreadPoolExecutor internalFileSystemCleanScheduler; // The pattern is used to match gvfs path. The scheme prefix (gvfs://fileset) is optional. // The following path can be match: @@ -107,7 +113,8 @@ public void initialize(URI name, Configuration configuration) throws IOException GravitinoVirtualFileSystemConfiguration .FS_GRAVITINO_FILESET_CACHE_EVICTION_MILLS_AFTER_ACCESS_KEY); - initializeCache(maxCapacity, evictionMillsAfterAccess); + initializeFileSystemCache(maxCapacity, evictionMillsAfterAccess); + initializeCatalogCache(); this.metalakeName = configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_METALAKE_KEY); @@ -126,37 +133,52 @@ public void initialize(URI name, Configuration configuration) throws IOException } @VisibleForTesting - Cache> getFilesetCache() { - return filesetCache; + Cache internalFileSystemCache() { + return internalFileSystemCache; } - private void initializeCache(int maxCapacity, long expireAfterAccess) { + private void initializeFileSystemCache(int maxCapacity, long expireAfterAccess) { // Since Caffeine does not ensure that removalListener will be involved after expiration // We use a scheduler with one thread to clean up expired clients. - this.scheduler = new ScheduledThreadPoolExecutor(1, newDaemonThreadFactory()); - - this.filesetCache = + this.internalFileSystemCleanScheduler = + new ScheduledThreadPoolExecutor(1, newDaemonThreadFactory("gvfs-filesystem-cache-cleaner")); + Caffeine cacheBuilder = Caffeine.newBuilder() .maximumSize(maxCapacity) - .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) - .scheduler(Scheduler.forScheduledExecutorService(scheduler)) + .scheduler(Scheduler.forScheduledExecutorService(internalFileSystemCleanScheduler)) .removalListener( (key, value, cause) -> { - try { - Pair pair = (Pair) value; - if (pair != null && pair.getRight() != null) pair.getRight().close(); - } catch (IOException e) { - Logger.error("Cannot close the file system for fileset: {}", key, e); + FileSystem fs = (FileSystem) value; + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + Logger.error("Cannot close the file system for fileset: {}", key, e); + } } - }) + }); + if (expireAfterAccess > 0) { + cacheBuilder.expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS); + } + this.internalFileSystemCache = cacheBuilder.build(); + } + + private void initializeCatalogCache() { + // Since Caffeine does not ensure that removalListener will be involved after expiration + // We use a scheduler with one thread to clean up expired clients. + this.catalogCleanScheduler = + new ScheduledThreadPoolExecutor(1, newDaemonThreadFactory("gvfs-catalog-cache-cleaner")); + // In most scenarios, it will not read so many catalog filesets at the same time, so we can just + // set a default value for this cache. + this.catalogCache = + Caffeine.newBuilder() + .maximumSize(100) + .scheduler(Scheduler.forScheduledExecutorService(catalogCleanScheduler)) .build(); } - private ThreadFactory newDaemonThreadFactory() { - return new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("gvfs-cache-cleaner" + "-%d") - .build(); + private ThreadFactory newDaemonThreadFactory(String name) { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + "-%d").build(); } private void initializeClient(Configuration configuration) { @@ -278,70 +300,6 @@ private String getVirtualLocation(NameIdentifier identifier, boolean withScheme) identifier.name()); } - @VisibleForTesting - Path getActualPathByIdentifier( - NameIdentifier identifier, Pair filesetPair, Path path) { - String virtualPath = path.toString(); - boolean withScheme = - virtualPath.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX); - String virtualLocation = getVirtualLocation(identifier, withScheme); - String storageLocation = filesetPair.getLeft().storageLocation(); - try { - if (checkMountsSingleFile(filesetPair)) { - Preconditions.checkArgument( - virtualPath.equals(virtualLocation), - "Path: %s should be same with the virtual prefix: %s, because the fileset only mounts a single file.", - virtualPath, - virtualLocation); - - return new Path(storageLocation); - } else { - // if the storage location ends with "/", - // we should handle the conversion specially - if (storageLocation.endsWith(SLASH)) { - String subPath = virtualPath.substring(virtualLocation.length()); - // For example, if the virtual path is `gvfs://fileset/catalog/schema/test_fileset/ttt`, - // and the storage location is `hdfs://cluster:8020/user/`, - // we should replace `gvfs://fileset/catalog/schema/test_fileset` with - // `hdfs://localhost:8020/user` which truncates the tailing slash. - // If the storage location is `hdfs://cluster:8020/user`, - // we can replace `gvfs://fileset/catalog/schema/test_fileset` with - // `hdfs://localhost:8020/user` directly. - if (subPath.startsWith(SLASH)) { - return new Path( - virtualPath.replaceFirst( - virtualLocation, storageLocation.substring(0, storageLocation.length() - 1))); - } else { - return new Path(virtualPath.replaceFirst(virtualLocation, storageLocation)); - } - } else { - return new Path(virtualPath.replaceFirst(virtualLocation, storageLocation)); - } - } - } catch (Exception e) { - throw new RuntimeException( - String.format("Cannot resolve path: %s to actual storage path, exception:", path), e); - } - } - - private boolean checkMountsSingleFile(Pair filesetPair) { - try { - return filesetPair - .getRight() - .getFileStatus(new Path(filesetPair.getLeft().storageLocation())) - .isFile(); - } catch (FileNotFoundException e) { - // We should always return false here, same with the logic in `FileSystem.isFile(Path f)`. - return false; - } catch (IOException e) { - throw new RuntimeException( - String.format( - "Cannot check whether the fileset: %s mounts a single file, exception: %s", - filesetPair.getLeft().name(), e.getMessage()), - e); - } - } - @VisibleForTesting FileStatus convertFileStatusPathPrefix( FileStatus fileStatus, String actualPrefix, String virtualPrefix) { @@ -381,51 +339,69 @@ NameIdentifier extractIdentifier(URI virtualUri) { return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2), matcher.group(3)); } - private FilesetContext getFilesetContext(Path virtualPath) { + private FilesetContextPair getFilesetContext(Path virtualPath, FilesetDataOperation operation) { NameIdentifier identifier = extractIdentifier(virtualPath.toUri()); - Pair pair = filesetCache.get(identifier, this::constructNewFilesetPair); - Preconditions.checkState( - pair != null, - "Cannot get the pair of fileset instance and actual file system for %s", - identifier); - Path actualPath = getActualPathByIdentifier(identifier, pair, virtualPath); - return FilesetContext.builder() - .withIdentifier(identifier) - .withFileset(pair.getLeft()) - .withFileSystem(pair.getRight()) - .withActualPath(actualPath) - .build(); - } + String virtualPathString = virtualPath.toString(); + String subPath = getSubPathFromVirtualPath(identifier, virtualPathString); - private Pair constructNewFilesetPair(NameIdentifier identifier) { - // Always create a new file system instance for the fileset. - // Therefore, users cannot bypass gvfs and use `FileSystem.get()` to directly obtain the - // FileSystem - try { - Fileset fileset = loadFileset(identifier); - URI storageUri = URI.create(fileset.storageLocation()); - FileSystem actualFileSystem = FileSystem.newInstance(storageUri, getConf()); - Preconditions.checkState(actualFileSystem != null, "Cannot get the actual file system"); - return Pair.of(fileset, actualFileSystem); - } catch (IOException e) { - throw new RuntimeException( - String.format( - "Cannot create file system for fileset: %s, exception: %s", - identifier, e.getMessage()), - e); - } catch (RuntimeException e) { - throw new RuntimeException( - String.format( - "Cannot load fileset: %s from the server. exception: %s", - identifier, e.getMessage())); - } + NameIdentifier catalogIdent = NameIdentifier.of(metalakeName, identifier.namespace().level(1)); + FilesetCatalog filesetCatalog = + catalogCache.get( + catalogIdent, ident -> client.loadCatalog(catalogIdent.name()).asFilesetCatalog()); + Preconditions.checkArgument( + filesetCatalog != null, String.format("Loaded fileset catalog: %s is null.", catalogIdent)); + + // set the thread local audit info + Map contextMap = Maps.newHashMap(); + contextMap.put( + FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, + InternalClientType.HADOOP_GVFS.name()); + contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, operation.name()); + CallerContext callerContext = CallerContext.builder().withContext(contextMap).build(); + CallerContext.CallerContextHolder.set(callerContext); + + String actualFileLocation = + filesetCatalog.getFileLocation( + NameIdentifier.of(identifier.namespace().level(2), identifier.name()), subPath); + + URI uri = new Path(actualFileLocation).toUri(); + // we cache the fs for the same scheme, so we can reuse it + String scheme = uri.getScheme(); + Preconditions.checkArgument( + StringUtils.isNotBlank(scheme), "Scheme of the actual file location cannot be null."); + FileSystem fs = + internalFileSystemCache.get( + scheme, + str -> { + try { + return FileSystem.newInstance(uri, getConf()); + } catch (IOException ioe) { + throw new GravitinoRuntimeException( + "Exception occurs when create new FileSystem for actual uri: %s, msg: %s", + uri, ioe); + } + }); + + return new FilesetContextPair(new Path(actualFileLocation), fs); } - private Fileset loadFileset(NameIdentifier identifier) { - Catalog catalog = client.loadCatalog(identifier.namespace().level(1)); - return catalog - .asFilesetCatalog() - .loadFileset(NameIdentifier.of(identifier.namespace().level(2), identifier.name())); + private String getSubPathFromVirtualPath(NameIdentifier identifier, String virtualPathString) { + return virtualPathString.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX) + ? virtualPathString.substring( + String.format( + "%s/%s/%s/%s", + GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX, + identifier.namespace().level(1), + identifier.namespace().level(2), + identifier.name()) + .length()) + : virtualPathString.substring( + String.format( + "/%s/%s/%s", + identifier.namespace().level(1), + identifier.namespace().level(2), + identifier.name()) + .length()); } @Override @@ -440,15 +416,15 @@ public synchronized Path getWorkingDirectory() { @Override public synchronized void setWorkingDirectory(Path newDir) { - FilesetContext context = getFilesetContext(newDir); - context.getFileSystem().setWorkingDirectory(context.getActualPath()); + FilesetContextPair context = getFilesetContext(newDir, FilesetDataOperation.SET_WORKING_DIR); + context.getFileSystem().setWorkingDirectory(context.getActualFileLocation()); this.workingDirectory = newDir; } @Override public FSDataInputStream open(Path path, int bufferSize) throws IOException { - FilesetContext context = getFilesetContext(path); - return context.getFileSystem().open(context.getActualPath(), bufferSize); + FilesetContextPair context = getFilesetContext(path, FilesetDataOperation.OPEN); + return context.getFileSystem().open(context.getActualFileLocation(), bufferSize); } @Override @@ -461,11 +437,11 @@ public FSDataOutputStream create( long blockSize, Progressable progress) throws IOException { - FilesetContext context = getFilesetContext(path); + FilesetContextPair context = getFilesetContext(path, FilesetDataOperation.CREATE); return context .getFileSystem() .create( - context.getActualPath(), + context.getActualFileLocation(), permission, overwrite, bufferSize, @@ -477,17 +453,14 @@ public FSDataOutputStream create( @Override public FSDataOutputStream append(Path path, int bufferSize, Progressable progress) throws IOException { - FilesetContext context = getFilesetContext(path); - return context.getFileSystem().append(context.getActualPath(), bufferSize, progress); + FilesetContextPair context = getFilesetContext(path, FilesetDataOperation.APPEND); + return context.getFileSystem().append(context.getActualFileLocation(), bufferSize, progress); } @Override public boolean rename(Path src, Path dst) throws IOException { - // There are two cases that cannot be renamed: - // 1. Fileset identifier is not allowed to be renamed, only its subdirectories can be renamed + // Fileset identifier is not allowed to be renamed, only its subdirectories can be renamed // which not in the storage location of the fileset; - // 2. Fileset only mounts a single file, the storage location of the fileset cannot be renamed; - // Otherwise the metadata in the Gravitino server may be inconsistent. NameIdentifier srcIdentifier = extractIdentifier(src.toUri()); NameIdentifier dstIdentifier = extractIdentifier(dst.toUri()); Preconditions.checkArgument( @@ -496,79 +469,85 @@ public boolean rename(Path src, Path dst) throws IOException { srcIdentifier, dstIdentifier); - FilesetContext srcFileContext = getFilesetContext(src); - if (checkMountsSingleFile( - Pair.of(srcFileContext.getFileset(), srcFileContext.getFileSystem()))) { - throw new UnsupportedOperationException( - String.format( - "Cannot rename the fileset: %s which only mounts to a single file.", srcIdentifier)); - } + FilesetContextPair srcContext = getFilesetContext(src, FilesetDataOperation.RENAME); + FilesetContextPair dstContext = getFilesetContext(dst, FilesetDataOperation.RENAME); - FilesetContext dstFileContext = getFilesetContext(dst); - return srcFileContext + return srcContext .getFileSystem() - .rename(srcFileContext.getActualPath(), dstFileContext.getActualPath()); + .rename(srcContext.getActualFileLocation(), dstContext.getActualFileLocation()); } @Override public boolean delete(Path path, boolean recursive) throws IOException { - FilesetContext context = getFilesetContext(path); - return context.getFileSystem().delete(context.getActualPath(), recursive); + FilesetContextPair context = getFilesetContext(path, FilesetDataOperation.DELETE); + return context.getFileSystem().delete(context.getActualFileLocation(), recursive); } @Override public FileStatus getFileStatus(Path path) throws IOException { - FilesetContext context = getFilesetContext(path); - FileStatus fileStatus = context.getFileSystem().getFileStatus(context.getActualPath()); + FilesetContextPair context = getFilesetContext(path, FilesetDataOperation.GET_FILE_STATUS); + FileStatus fileStatus = context.getFileSystem().getFileStatus(context.getActualFileLocation()); + NameIdentifier identifier = extractIdentifier(path.toUri()); + String subPath = getSubPathFromVirtualPath(identifier, path.toString()); + String storageLocation = + context + .getActualFileLocation() + .toString() + .substring(0, context.getActualFileLocation().toString().length() - subPath.length()); return convertFileStatusPathPrefix( - fileStatus, - new Path(context.getFileset().storageLocation()).toString(), - getVirtualLocation(context.getIdentifier(), true)); + fileStatus, storageLocation, getVirtualLocation(identifier, true)); } @Override public FileStatus[] listStatus(Path path) throws IOException { - FilesetContext context = getFilesetContext(path); - FileStatus[] fileStatusResults = context.getFileSystem().listStatus(context.getActualPath()); + FilesetContextPair context = getFilesetContext(path, FilesetDataOperation.LIST_STATUS); + FileStatus[] fileStatusResults = + context.getFileSystem().listStatus(context.getActualFileLocation()); + NameIdentifier identifier = extractIdentifier(path.toUri()); + String subPath = getSubPathFromVirtualPath(identifier, path.toString()); + String storageLocation = + context + .getActualFileLocation() + .toString() + .substring(0, context.getActualFileLocation().toString().length() - subPath.length()); return Arrays.stream(fileStatusResults) .map( fileStatus -> convertFileStatusPathPrefix( - fileStatus, - new Path(context.getFileset().storageLocation()).toString(), - getVirtualLocation(context.getIdentifier(), true))) + fileStatus, storageLocation, getVirtualLocation(identifier, true))) .toArray(FileStatus[]::new); } @Override public boolean mkdirs(Path path, FsPermission permission) throws IOException { - FilesetContext context = getFilesetContext(path); - return context.getFileSystem().mkdirs(context.getActualPath(), permission); + FilesetContextPair context = getFilesetContext(path, FilesetDataOperation.MKDIRS); + return context.getFileSystem().mkdirs(context.getActualFileLocation(), permission); } @Override public short getDefaultReplication(Path f) { - FilesetContext context = getFilesetContext(f); - return context.getFileSystem().getDefaultReplication(context.getActualPath()); + FilesetContextPair context = getFilesetContext(f, FilesetDataOperation.GET_DEFAULT_REPLICATION); + return context.getFileSystem().getDefaultReplication(context.getActualFileLocation()); } @Override public long getDefaultBlockSize(Path f) { - FilesetContext context = getFilesetContext(f); - return context.getFileSystem().getDefaultBlockSize(context.getActualPath()); + FilesetContextPair context = getFilesetContext(f, FilesetDataOperation.GET_DEFAULT_BLOCK_SIZE); + return context.getFileSystem().getDefaultBlockSize(context.getActualFileLocation()); } @Override public synchronized void close() throws IOException { // close all actual FileSystems - for (Pair filesetPair : filesetCache.asMap().values()) { + for (FileSystem fileSystem : internalFileSystemCache.asMap().values()) { try { - filesetPair.getRight().close(); + fileSystem.close(); } catch (IOException e) { // ignore } } - filesetCache.invalidateAll(); + internalFileSystemCache.invalidateAll(); + catalogCache.invalidateAll(); // close the client try { if (client != null) { @@ -577,7 +556,26 @@ public synchronized void close() throws IOException { } catch (Exception e) { // ignore } - scheduler.shutdownNow(); + catalogCleanScheduler.shutdownNow(); + internalFileSystemCleanScheduler.shutdownNow(); super.close(); } + + private static class FilesetContextPair { + private final Path actualFileLocation; + private final FileSystem fileSystem; + + public FilesetContextPair(Path actualFileLocation, FileSystem fileSystem) { + this.actualFileLocation = actualFileLocation; + this.fileSystem = fileSystem; + } + + public Path getActualFileLocation() { + return actualFileLocation; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + } } diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java index d82f330b1e3..da2be0caefb 100644 --- a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/GravitinoMockServerBase.java @@ -59,8 +59,6 @@ public abstract class GravitinoMockServerBase { protected static final String metalakeName = "metalake_1"; protected static final String catalogName = "fileset_catalog_1"; protected static final String schemaName = "schema_1"; - protected static final String managedFilesetName = "managed_fileset"; - protected static final String externalFilesetName = "external_fileset"; protected static final String provider = "test"; @BeforeAll diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java index d3c5d834b18..e7e3b7857f5 100644 --- a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java @@ -18,15 +18,15 @@ */ package org.apache.gravitino.filesystem.hadoop; +import static org.apache.hc.core5.http.HttpStatus.SC_OK; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import java.io.File; +import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -34,36 +34,34 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.gravitino.NameIdentifier; -import org.apache.gravitino.file.Fileset; +import org.apache.gravitino.dto.responses.FileLocationResponse; +import org.apache.gravitino.rest.RESTUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hc.core5.http.Method; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.Mockito; public class TestGvfsBase extends GravitinoMockServerBase { protected static final String GVFS_IMPL_CLASS = GravitinoVirtualFileSystem.class.getName(); protected static final String GVFS_ABSTRACT_IMPL_CLASS = Gvfs.class.getName(); protected static Configuration conf = new Configuration(); - protected Path localDirPath = null; - protected Path localFilePath = null; - protected Path managedFilesetPath = null; - protected Path externalFilesetPath = null; + protected static final Path localCatalogPath = + FileSystemTestUtils.createLocalRootDir(catalogName); @BeforeAll public static void setup() { @@ -85,11 +83,12 @@ public static void setup() { @AfterAll public static void tearDown() { GravitinoMockServerBase.tearDown(); - String fileName = FileSystemTestUtils.localRootPrefix().replace("file:", ""); - try { - FileUtils.deleteDirectory(new File(fileName)); - } catch (Exception e) { - // Ignore + try (FileSystem localFileSystem = localCatalogPath.getFileSystem(conf)) { + if (localFileSystem.exists(localCatalogPath)) { + localFileSystem.delete(localCatalogPath, true); + } + } catch (IOException e) { + // ignore } } @@ -97,49 +96,20 @@ public static void tearDown() { public void init() { mockMetalakeDTO(metalakeName, "comment"); mockCatalogDTO(catalogName, provider, "comment"); - - localDirPath = - FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, managedFilesetName); - mockFilesetDTO( - metalakeName, - catalogName, - schemaName, - managedFilesetName, - Fileset.Type.MANAGED, - localDirPath.toString()); - managedFilesetPath = - FileSystemTestUtils.createFilesetPath(catalogName, schemaName, managedFilesetName, true); - - localFilePath = - new Path( - FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, externalFilesetName) - + "/test.txt"); - mockFilesetDTO( - metalakeName, - catalogName, - schemaName, - externalFilesetName, - Fileset.Type.EXTERNAL, - localFilePath.toString()); - externalFilesetPath = - FileSystemTestUtils.createFilesetPath(catalogName, schemaName, externalFilesetName, true); - } - - @AfterEach - public void destroy() throws IOException { - Path localRootPath = FileSystemTestUtils.createLocalRootDir(catalogName); - try (FileSystem localFileSystem = localDirPath.getFileSystem(conf)) { - if (localFileSystem.exists(localRootPath)) { - localFileSystem.delete(localRootPath, true); - } - } } @Test public void testFSCache() throws IOException { + String filesetName = "testFSCache"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (FileSystem gravitinoFileSystem = managedFilesetPath.getFileSystem(conf); - FileSystem localFileSystem = localDirPath.getFileSystem(conf)) { - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); + FileSystem localFileSystem = localPath.getFileSystem(conf)) { Configuration conf1 = localFileSystem.getConf(); assertEquals( @@ -158,20 +128,28 @@ public void testFSCache() throws IOException { GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME))); // test gvfs, should not get the same fs - try (FileSystem externalFs = externalFilesetPath.getFileSystem(conf)) { - assertNotEquals(externalFs, gravitinoFileSystem); + Path newGvfsPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, "new_fileset", true); + try (FileSystem anotherFS = newGvfsPath.getFileSystem(conf)) { + assertNotEquals(anotherFS, gravitinoFileSystem); } // test proxied local fs, should not get the same fs + FileLocationResponse fileLocationResponse = new FileLocationResponse(localPath.toString()); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); FileSystem proxyLocalFs = Objects.requireNonNull( - ((GravitinoVirtualFileSystem) gravitinoFileSystem) - .getFilesetCache() - .getIfPresent( - NameIdentifier.of( - metalakeName, catalogName, schemaName, managedFilesetName))) - .getRight(); + ((GravitinoVirtualFileSystem) gravitinoFileSystem) + .internalFileSystemCache() + .getIfPresent("file")); String anotherFilesetName = "test_new_fs"; Path diffLocalPath = @@ -185,54 +163,32 @@ public void testFSCache() throws IOException { @Test public void testInternalCache() throws IOException { - Configuration configuration = new Configuration(conf); - configuration.set( + Path localPath1 = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, "fileset1"); + Path filesetPath1 = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, "fileset1", true); + String locationPath1 = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, "fileset1"); + Configuration configuration1 = new Configuration(conf); + configuration1.set( GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_FILESET_CACHE_MAX_CAPACITY_KEY, "1"); - configuration.set( + configuration1.set( GravitinoVirtualFileSystemConfiguration .FS_GRAVITINO_FILESET_CACHE_EVICTION_MILLS_AFTER_ACCESS_KEY, "1000"); - - Path filesetPath1 = - FileSystemTestUtils.createFilesetPath(catalogName, schemaName, "fileset1", true); - try (FileSystem fs = filesetPath1.getFileSystem(configuration)) { - Path localPath1 = - FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, "fileset1"); - mockFilesetDTO( - metalakeName, - catalogName, - schemaName, - "fileset1", - Fileset.Type.MANAGED, - localPath1.toString()); + try (FileSystem fs = filesetPath1.getFileSystem(configuration1)) { + FileLocationResponse fileLocationResponse = new FileLocationResponse(localPath1.toString()); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("")); + try { + buildMockResource( + Method.GET, locationPath1, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } FileSystemTestUtils.mkdirs(filesetPath1, fs); - // expired by size - Path filesetPath2 = - FileSystemTestUtils.createFilesetPath(catalogName, schemaName, "fileset2", true); - Path localPath2 = - FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, "fileset2"); - mockFilesetDTO( - metalakeName, - catalogName, - schemaName, - "fileset2", - Fileset.Type.MANAGED, - localPath2.toString()); - FileSystemTestUtils.mkdirs(filesetPath2, fs); - - Awaitility.await() - .atMost(5, TimeUnit.SECONDS) - .pollInterval(1, TimeUnit.SECONDS) - .untilAsserted( - () -> - assertNull( - ((GravitinoVirtualFileSystem) fs) - .getFilesetCache() - .getIfPresent( - NameIdentifier.of( - metalakeName, catalogName, schemaName, "fileset1")))); - // expired by time Awaitility.await() .atMost(5, TimeUnit.SECONDS) @@ -240,27 +196,44 @@ public void testInternalCache() throws IOException { .untilAsserted( () -> assertEquals( - 0, ((GravitinoVirtualFileSystem) fs).getFilesetCache().asMap().size())); + 0, + ((GravitinoVirtualFileSystem) fs).internalFileSystemCache().asMap().size())); - assertNull( - ((GravitinoVirtualFileSystem) fs) - .getFilesetCache() - .getIfPresent(NameIdentifier.of(metalakeName, catalogName, schemaName, "fileset2"))); + assertNull(((GravitinoVirtualFileSystem) fs).internalFileSystemCache().getIfPresent("file")); } } @ParameterizedTest @ValueSource(booleans = {true, false}) public void testCreate(boolean withScheme) throws IOException { + String filesetName = "testCreate"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (FileSystem gravitinoFileSystem = managedFilesetPath.getFileSystem(conf); - FileSystem localFileSystem = localDirPath.getFileSystem(conf)) { - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); + FileSystem localFileSystem = localPath.getFileSystem(conf)) { + FileSystemTestUtils.mkdirs(localPath, localFileSystem); + assertTrue(localFileSystem.exists(localPath)); + // test gvfs normal create + FileLocationResponse fileLocationResponse = new FileLocationResponse(localPath + "/test.txt"); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("/test.txt")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } - // test managed fileset create + Path localFilePath = new Path(localPath + "/test.txt"); + assertFalse(localFileSystem.exists(localFilePath)); Path filePath = new Path(managedFilesetPath + "/test.txt"); FileSystemTestUtils.create(filePath, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(filePath)); - gravitinoFileSystem.delete(filePath, true); + assertTrue(localFileSystem.exists(localFilePath)); + localFileSystem.delete(localFilePath, true); // mock the invalid fileset not in the server String invalidFilesetName = "invalid_fileset"; @@ -277,13 +250,6 @@ public void testCreate(boolean withScheme) throws IOException { assertThrows( RuntimeException.class, () -> FileSystemTestUtils.create(localPrefixPath, gravitinoFileSystem)); - - // test external fileset mounts a single file - FileSystemTestUtils.create(externalFilesetPath, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(externalFilesetPath)); - assertTrue(gravitinoFileSystem.getFileStatus(externalFilesetPath).isFile()); - gravitinoFileSystem.delete(externalFilesetPath, true); - assertFalse(localFileSystem.exists(localFilePath)); } } @@ -291,21 +257,39 @@ public void testCreate(boolean withScheme) throws IOException { @ValueSource(booleans = {true, false}) @Disabled("Append operation is not supported in LocalFileSystem. We can't test it now.") public void testAppend(boolean withScheme) throws IOException { + String filesetName = "testAppend"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (FileSystem gravitinoFileSystem = managedFilesetPath.getFileSystem(conf); - FileSystem localFileSystem = localDirPath.getFileSystem(conf)) { - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); + FileSystem localFileSystem = localPath.getFileSystem(conf)) { + FileSystemTestUtils.mkdirs(localPath, localFileSystem); + assertTrue(localFileSystem.exists(localPath)); // test managed fileset append + FileLocationResponse fileLocationResponse = new FileLocationResponse(localPath + "/test.txt"); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("/test.txt")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + Path appendFile = new Path(managedFilesetPath + "/test.txt"); - FileSystemTestUtils.create(appendFile, gravitinoFileSystem); + Path localAppendFile = new Path(localPath + "/test.txt"); + FileSystemTestUtils.create(localAppendFile, localFileSystem); + assertTrue(localFileSystem.exists(localAppendFile)); FileSystemTestUtils.append(appendFile, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(appendFile)); - assertTrue(gravitinoFileSystem.getFileStatus(appendFile).isFile()); assertEquals( "Hello, World!", new String( - FileSystemTestUtils.read(appendFile, gravitinoFileSystem), StandardCharsets.UTF_8)); - gravitinoFileSystem.delete(appendFile, true); + FileSystemTestUtils.read(localAppendFile, localFileSystem), StandardCharsets.UTF_8)); + localFileSystem.delete(localAppendFile, true); // mock the invalid fileset not in server String invalidAppendFilesetName = "invalid_fileset"; @@ -322,49 +306,73 @@ public void testAppend(boolean withScheme) throws IOException { assertThrows( RuntimeException.class, () -> FileSystemTestUtils.append(localPrefixPath, gravitinoFileSystem)); - - // test external fileset mounts the single file - FileSystemTestUtils.create(externalFilesetPath, gravitinoFileSystem); - FileSystemTestUtils.append(externalFilesetPath, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(externalFilesetPath)); - assertTrue(gravitinoFileSystem.getFileStatus(externalFilesetPath).isFile()); - gravitinoFileSystem.delete(externalFilesetPath, true); - assertFalse(localFileSystem.exists(localFilePath)); } } @ParameterizedTest @ValueSource(booleans = {true, false}) public void testRename(boolean withScheme) throws IOException { + String filesetName = "testRename"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (FileSystem gravitinoFileSystem = managedFilesetPath.getFileSystem(conf); - FileSystem localFileSystem = localDirPath.getFileSystem(conf)) { - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); + FileSystem localFileSystem = localPath.getFileSystem(conf)) { + FileSystemTestUtils.mkdirs(localPath, localFileSystem); + assertTrue(localFileSystem.exists(localPath)); // test managed fileset rename - Path srcRenamePath = new Path(managedFilesetPath + "/rename_src"); - gravitinoFileSystem.mkdirs(srcRenamePath); - assertTrue(gravitinoFileSystem.getFileStatus(srcRenamePath).isDirectory()); - assertTrue(gravitinoFileSystem.exists(srcRenamePath)); + FileLocationResponse fileLocationResponse = + new FileLocationResponse(localPath + "/rename_src"); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("/rename_src")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + FileLocationResponse fileLocationResponse1 = + new FileLocationResponse(localPath + "/rename_dst2"); + Map queryParams1 = new HashMap<>(); + queryParams1.put("sub_path", RESTUtils.encodeString("/rename_dst2")); + try { + buildMockResource( + Method.GET, locationPath, queryParams1, null, fileLocationResponse1, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + Path srcLocalRenamePath = new Path(localPath + "/rename_src"); + localFileSystem.mkdirs(srcLocalRenamePath); + assertTrue(localFileSystem.getFileStatus(srcLocalRenamePath).isDirectory()); + assertTrue(localFileSystem.exists(srcLocalRenamePath)); // cannot rename the identifier + Path srcFilesetRenamePath = new Path(managedFilesetPath + "/rename_src"); Path dstRenamePath1 = FileSystemTestUtils.createFilesetPath(catalogName, schemaName, "rename_dst1", withScheme); assertThrows( - RuntimeException.class, () -> gravitinoFileSystem.rename(srcRenamePath, dstRenamePath1)); + RuntimeException.class, + () -> gravitinoFileSystem.rename(srcFilesetRenamePath, dstRenamePath1)); - Path dstRenamePath2 = new Path(managedFilesetPath + "/rename_dst2"); - gravitinoFileSystem.rename(srcRenamePath, dstRenamePath2); - assertFalse(gravitinoFileSystem.exists(srcRenamePath)); - assertTrue(gravitinoFileSystem.exists(dstRenamePath2)); - gravitinoFileSystem.delete(dstRenamePath2, true); + Path dstFilesetRenamePath2 = new Path(managedFilesetPath + "/rename_dst2"); + Path dstLocalRenamePath2 = new Path(localPath + "/rename_dst2"); + gravitinoFileSystem.rename(srcFilesetRenamePath, dstFilesetRenamePath2); + assertFalse(localFileSystem.exists(srcLocalRenamePath)); + assertTrue(localFileSystem.exists(dstLocalRenamePath2)); + localFileSystem.delete(dstLocalRenamePath2, true); // test invalid src path Path invalidSrcPath = FileSystemTestUtils.createFilesetPath( catalogName, schemaName, "invalid_src_name", withScheme); Path validDstPath = - FileSystemTestUtils.createFilesetPath( - catalogName, schemaName, managedFilesetName, withScheme); + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, withScheme); assertThrows( RuntimeException.class, () -> gravitinoFileSystem.rename(invalidSrcPath, validDstPath)); @@ -375,33 +383,42 @@ public void testRename(boolean withScheme) throws IOException { assertThrows( RuntimeException.class, () -> gravitinoFileSystem.rename(managedFilesetPath, invalidDstPath)); - - // test external fileset mount the single file - FileSystemTestUtils.create(externalFilesetPath, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(externalFilesetPath)); - assertTrue(gravitinoFileSystem.getFileStatus(externalFilesetPath).isFile()); - - Path dstPath = - FileSystemTestUtils.createFilesetPath(catalogName, schemaName, "rename_dst", withScheme); - assertThrows( - RuntimeException.class, () -> gravitinoFileSystem.rename(externalFilesetPath, dstPath)); - localFileSystem.delete(localFilePath, true); - assertFalse(localFileSystem.exists(localFilePath)); } } @ParameterizedTest @ValueSource(booleans = {true, false}) public void testDelete(boolean withScheme) throws IOException { + String filesetName = "testDelete"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (FileSystem gravitinoFileSystem = managedFilesetPath.getFileSystem(conf); - FileSystem localFileSystem = localDirPath.getFileSystem(conf)) { - - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); + FileSystem localFileSystem = localPath.getFileSystem(conf)) { + FileSystemTestUtils.mkdirs(localPath, localFileSystem); + assertTrue(localFileSystem.exists(localPath)); // test managed fileset delete - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); - gravitinoFileSystem.delete(managedFilesetPath, true); - assertFalse(gravitinoFileSystem.exists(managedFilesetPath)); + FileLocationResponse fileLocationResponse = + new FileLocationResponse(localPath + "/test_delete"); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("/test_delete")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + Path dirPath = new Path(managedFilesetPath + "/test_delete"); + Path localDirPath = new Path(localPath + "/test_delete"); + localFileSystem.mkdirs(localDirPath); + assertTrue(localFileSystem.exists(localDirPath)); + gravitinoFileSystem.delete(dirPath, true); + assertFalse(localFileSystem.exists(localDirPath)); // mock the invalid fileset not in server String invalidFilesetName = "invalid_fileset"; @@ -415,32 +432,35 @@ public void testDelete(boolean withScheme) throws IOException { Path localPrefixPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, "test"); assertThrows(RuntimeException.class, () -> gravitinoFileSystem.delete(localPrefixPath, true)); - - // test external fileset mounts the single file - FileSystemTestUtils.create(externalFilesetPath, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(externalFilesetPath)); - gravitinoFileSystem.delete(externalFilesetPath, true); - assertFalse(gravitinoFileSystem.exists(externalFilesetPath)); - assertFalse(localFileSystem.exists(localFilePath)); } } @Test public void testGetStatus() throws IOException { + String filesetName = "testGetStatus"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (FileSystem gravitinoFileSystem = managedFilesetPath.getFileSystem(conf); - FileSystem localFileSystem = localDirPath.getFileSystem(conf)) { - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); - - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(managedFilesetPath)); - assertTrue(gravitinoFileSystem.getFileStatus(managedFilesetPath).isDirectory()); - assertTrue(localFileSystem.exists(localDirPath)); + FileSystem localFileSystem = localPath.getFileSystem(conf)) { + FileSystemTestUtils.mkdirs(localPath, localFileSystem); + assertTrue(localFileSystem.exists(localPath)); + + FileLocationResponse fileLocationResponse = new FileLocationResponse(localPath.toString()); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } FileStatus gravitinoStatus = gravitinoFileSystem.getFileStatus(managedFilesetPath); - FileStatus localStatus = localFileSystem.getFileStatus(localDirPath); - gravitinoFileSystem.delete(managedFilesetPath, true); - - assertFalse(gravitinoFileSystem.exists(managedFilesetPath)); + FileStatus localStatus = localFileSystem.getFileStatus(localPath); assertEquals( localStatus.getPath().toString(), gravitinoStatus @@ -449,47 +469,38 @@ public void testGetStatus() throws IOException { .replaceFirst( GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX, FileSystemTestUtils.localRootPrefix())); - - // test get file status with storageLocation end with "/" - String fName = "test_location_with_slash"; - String tmpPathString = - String.format( - "%s/%s/%s/%s", - FileSystemTestUtils.localRootPrefix(), catalogName, schemaName, fName + "/"); - Path tempPath = new Path(tmpPathString); - - mockFilesetDTO( - metalakeName, catalogName, schemaName, fName, Fileset.Type.MANAGED, tmpPathString); - Path fPath = FileSystemTestUtils.createFilesetPath(catalogName, schemaName, fName, true); - localFileSystem.mkdirs(tempPath); - assertTrue(localFileSystem.exists(tempPath)); - Path subDir = new Path(tempPath + "/sub_dir"); - localFileSystem.mkdirs(subDir); - assertTrue(localFileSystem.exists(tempPath)); - - Path subDirVirtualPath = new Path(fPath + "/sub_dir"); - FileStatus subDirStatus = gravitinoFileSystem.getFileStatus(subDirVirtualPath); - assertEquals(fPath + "/sub_dir", subDirStatus.getPath().toString()); } } @Test public void testListStatus() throws IOException { + String filesetName = "testListStatus"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (FileSystem gravitinoFileSystem = managedFilesetPath.getFileSystem(conf); - FileSystem localFileSystem = localDirPath.getFileSystem(conf)) { - - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); - - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(managedFilesetPath)); - assertTrue(gravitinoFileSystem.getFileStatus(managedFilesetPath).isDirectory()); - assertTrue(localFileSystem.exists(localDirPath)); + FileSystem localFileSystem = localPath.getFileSystem(conf)) { + FileSystemTestUtils.mkdirs(localPath, localFileSystem); + assertTrue(localFileSystem.exists(localPath)); for (int i = 0; i < 5; i++) { - Path subPath = new Path(managedFilesetPath + "/sub" + i); - FileSystemTestUtils.mkdirs(subPath, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(subPath)); - assertTrue(gravitinoFileSystem.getFileStatus(subPath).isDirectory()); + Path subLocalPath = new Path(localPath + "/sub" + i); + FileSystemTestUtils.mkdirs(subLocalPath, localFileSystem); + assertTrue(localFileSystem.exists(subLocalPath)); + assertTrue(localFileSystem.getFileStatus(subLocalPath).isDirectory()); + } + + FileLocationResponse fileLocationResponse = new FileLocationResponse(localPath.toString()); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); } List gravitinoStatuses = @@ -498,7 +509,7 @@ public void testListStatus() throws IOException { assertEquals(5, gravitinoStatuses.size()); List localStatuses = - new ArrayList<>(Arrays.asList(localFileSystem.listStatus(localDirPath))); + new ArrayList<>(Arrays.asList(localFileSystem.listStatus(localPath))); localStatuses.sort(Comparator.comparing(FileStatus::getPath)); assertEquals(5, localStatuses.size()); @@ -512,46 +523,45 @@ public void testListStatus() throws IOException { .replaceFirst( GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX, FileSystemTestUtils.localRootPrefix())); - gravitinoFileSystem.delete(gravitinoStatuses.get(i).getPath(), true); } - - // test list file status with storageLocation end with "/" - String fName = "test_list_location_with_slash"; - String tmpPathString = - String.format( - "%s/%s/%s/%s", - FileSystemTestUtils.localRootPrefix(), catalogName, schemaName, fName + "/"); - Path tempPath = new Path(tmpPathString); - - mockFilesetDTO( - metalakeName, catalogName, schemaName, fName, Fileset.Type.MANAGED, tmpPathString); - Path fPath = FileSystemTestUtils.createFilesetPath(catalogName, schemaName, fName, true); - localFileSystem.mkdirs(tempPath); - assertTrue(localFileSystem.exists(tempPath)); - Path subDir = new Path(tempPath + "/sub_dir"); - localFileSystem.mkdirs(subDir); - assertTrue(localFileSystem.exists(tempPath)); - - FileStatus[] subDirStatus = gravitinoFileSystem.listStatus(fPath); - assertEquals(fPath + "/sub_dir", subDirStatus[0].getPath().toString()); } } @Test public void testMkdirs() throws IOException { + String filesetName = "testMkdirs"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (FileSystem gravitinoFileSystem = managedFilesetPath.getFileSystem(conf); - FileSystem localFileSystem = localDirPath.getFileSystem(conf)) { - - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); + FileSystem localFileSystem = localPath.getFileSystem(conf)) { + FileSystemTestUtils.mkdirs(localPath, localFileSystem); + assertTrue(localFileSystem.exists(localPath)); + assertTrue(localFileSystem.getFileStatus(localPath).isDirectory()); + + FileLocationResponse fileLocationResponse = + new FileLocationResponse(localPath + "/test_mkdirs"); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("/test_mkdirs")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } - FileSystemTestUtils.mkdirs(managedFilesetPath, gravitinoFileSystem); - assertTrue(gravitinoFileSystem.exists(managedFilesetPath)); - assertTrue(gravitinoFileSystem.getFileStatus(managedFilesetPath).isDirectory()); + Path subDirPath = new Path(managedFilesetPath + "/test_mkdirs"); + Path localDirPath = new Path(localPath + "/test_mkdirs"); + FileSystemTestUtils.mkdirs(subDirPath, gravitinoFileSystem); + assertTrue(localFileSystem.exists(localDirPath)); + assertTrue(localFileSystem.getFileStatus(localDirPath).isDirectory()); - FileStatus gravitinoStatus = gravitinoFileSystem.getFileStatus(managedFilesetPath); FileStatus localStatus = localFileSystem.getFileStatus(localDirPath); - gravitinoFileSystem.delete(managedFilesetPath, true); - assertFalse(gravitinoFileSystem.exists(managedFilesetPath)); + + FileStatus gravitinoStatus = gravitinoFileSystem.getFileStatus(subDirPath); assertEquals( localStatus.getPath().toString(), @@ -566,6 +576,9 @@ public void testMkdirs() throws IOException { @Test public void testExtractIdentifier() throws IOException, URISyntaxException { + String filesetName = "testExtractIdentifier"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); try (GravitinoVirtualFileSystem fs = (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) { NameIdentifier identifier = @@ -638,22 +651,61 @@ public void testExtractIdentifier() throws IOException, URISyntaxException { @Test public void testGetDefaultReplications() throws IOException { + String filesetName = "testGetDefaultReplications"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (GravitinoVirtualFileSystem fs = (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) { + + FileLocationResponse fileLocationResponse = new FileLocationResponse(localPath.toString()); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + assertEquals(1, fs.getDefaultReplication(managedFilesetPath)); } } @Test public void testGetDefaultBlockSize() throws IOException { + String filesetName = "testGetDefaultBlockSize"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); + Path localPath = FileSystemTestUtils.createLocalDirPrefix(catalogName, schemaName, filesetName); + String locationPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s/location", + metalakeName, catalogName, schemaName, filesetName); try (GravitinoVirtualFileSystem fs = (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) { + + FileLocationResponse fileLocationResponse = new FileLocationResponse(localPath.toString()); + Map queryParams = new HashMap<>(); + queryParams.put("sub_path", RESTUtils.encodeString("")); + try { + buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + assertEquals(32 * 1024 * 1024, fs.getDefaultBlockSize(managedFilesetPath)); } } @Test public void testConvertFileStatusPathPrefix() throws IOException { + String filesetName = "testConvertFileStatusPathPrefix"; + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, filesetName, true); try (GravitinoVirtualFileSystem fs = (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) { FileStatus fileStatus = @@ -667,36 +719,4 @@ public void testConvertFileStatusPathPrefix() throws IOException { assertEquals(expectedPath, convertedStatus.getPath()); } } - - @Test - public void testGetActualPathByIdentifier() throws IOException { - try (GravitinoVirtualFileSystem fs = - (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) { - // test storage location end with "/" - NameIdentifier ident1 = - NameIdentifier.of("test_metalake", "catalog", "schema", "testGetActualPath"); - Fileset mockFileset1 = Mockito.mock(Fileset.class); - Mockito.when(mockFileset1.storageLocation()).thenReturn("file:/tmp/test/123/"); - FileSystem mockFs1 = Mockito.mock(FileSystem.class); - FileStatus mockFileStatus1 = Mockito.mock(FileStatus.class); - Mockito.when(mockFileStatus1.isFile()).thenReturn(false); - Mockito.when(mockFs1.getFileStatus(any())).thenReturn(mockFileStatus1); - // test virtual path sub dir with "/" - Path virtualPath1 = - new Path("gvfs://fileset/catalog/schema/testGetActualPath/sub_dir/1.parquet"); - Path actualPath1 = - fs.getActualPathByIdentifier(ident1, Pair.of(mockFileset1, mockFs1), virtualPath1); - assertEquals( - String.format("%ssub_dir/1.parquet", mockFileset1.storageLocation()), - actualPath1.toString()); - - // test virtual path sub dir without "/" - Path virtualPath2 = new Path("gvfs://fileset/catalog/schema/testGetActualPath"); - Path actualPath2 = - fs.getActualPathByIdentifier(ident1, Pair.of(mockFileset1, mockFs1), virtualPath2); - assertEquals( - mockFileset1.storageLocation().substring(0, mockFileset1.storageLocation().length() - 1), - actualPath2.toString()); - } - } } diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestKerberosClient.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestKerberosClient.java index 723e7366d37..564b05cee72 100644 --- a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestKerberosClient.java +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestKerberosClient.java @@ -71,6 +71,8 @@ public static void teardown() { @Test public void testAuthConfigs() { // init conf + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, "testAuthConfigs", true); Configuration configuration = new Configuration(); configuration.set( String.format( @@ -100,6 +102,9 @@ public void testAuthConfigs() { @Test public void testAuthWithPrincipalAndKeytabNormally() throws Exception { + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath( + catalogName, schemaName, "testAuthWithPrincipalAndKeytabNormally", true); KerberosAuthenticator kerberosAuthenticator = new KerberosAuthenticator(); Config config = new Config(false) {}; config.set(PRINCIPAL, KdcServerBase.getServerPrincipal()); @@ -141,6 +146,9 @@ public void testAuthWithPrincipalAndKeytabNormally() throws Exception { @Test public void testAuthWithInvalidInfo() throws Exception { + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath( + catalogName, schemaName, "testAuthWithInvalidInfo", true); KerberosAuthenticator kerberosAuthenticator = new KerberosAuthenticator(); Config config = new Config(false) {}; config.set(PRINCIPAL, KdcServerBase.getServerPrincipal()); diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestOauth2Client.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestOauth2Client.java index 2a7ac1f30d1..2186f530673 100644 --- a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestOauth2Client.java +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestOauth2Client.java @@ -160,6 +160,9 @@ public void testAuthNormally() { @Test public void testFileSystemAuthConfigs() throws IOException { // init conf + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath( + catalogName, schemaName, "testFileSystemAuthConfigs", true); Configuration configuration = new Configuration(); configuration.set( String.format( @@ -207,6 +210,9 @@ public void testFileSystemAuthConfigs() throws IOException { @Test public void testFileSystemAuthUnauthorized() throws ParseException { + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath( + catalogName, schemaName, "testFileSystemAuthUnauthorized", true); // 1. test always throw UnauthorizedException HttpResponse mockResponse = response().withStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR); OAuth2ErrorResponse respBody = new OAuth2ErrorResponse("invalid_client", "invalid"); @@ -332,6 +338,9 @@ public void testFileSystemAuthUnauthorized() throws ParseException { @Test public void testFileSystemAuthBadRequest() { + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath( + catalogName, schemaName, "testFileSystemAuthBadRequest", true); HttpResponse mockResponse = response().withStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR); OAuth2ErrorResponse respBody = new OAuth2ErrorResponse("invalid_grant", "invalid"); try { diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestSimpleClient.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestSimpleClient.java index 280755813f4..b88fbba16b4 100644 --- a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestSimpleClient.java +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestSimpleClient.java @@ -90,6 +90,8 @@ public void testSimpleAuthToken() throws IOException { // set the user for simple authentication String user = "test"; System.setProperty("user.name", user); + Path managedFilesetPath = + FileSystemTestUtils.createFilesetPath(catalogName, schemaName, "testSimpleAuthToken", true); Path newPath = new Path(managedFilesetPath.toString().replace(metalakeName, testMetalake)); diff --git a/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java b/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java index 0324d4594e1..35ab16cd433 100644 --- a/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java +++ b/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java @@ -57,7 +57,7 @@ public static boolean checkValid(String operation) { FilesetDataOperation.valueOf(operation); return true; } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Unknown fileset data operation: " + operation, e); + return false; } } } diff --git a/common/src/main/java/org/apache/gravitino/audit/InternalClientType.java b/common/src/main/java/org/apache/gravitino/audit/InternalClientType.java index 8c273f1d710..516f5e263af 100644 --- a/common/src/main/java/org/apache/gravitino/audit/InternalClientType.java +++ b/common/src/main/java/org/apache/gravitino/audit/InternalClientType.java @@ -45,7 +45,7 @@ public static boolean checkValid(String clientType) { InternalClientType.valueOf(clientType); return true; } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Unknown internal client type: " + clientType, e); + return false; } } } diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java index 101ebab0788..7ceed9e2e17 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java @@ -21,12 +21,17 @@ import static org.apache.gravitino.StringIdentifier.ID_KEY; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.io.File; import java.io.IOException; import java.util.Map; import java.util.UUID; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.audit.CallerContext; +import org.apache.gravitino.audit.FilesetAuditConstants; +import org.apache.gravitino.audit.FilesetDataOperation; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; import org.apache.gravitino.file.Fileset; import org.apache.gravitino.file.FilesetChange; import org.junit.jupiter.api.Assertions; @@ -213,6 +218,85 @@ public void testCreateAndGetFileLocation() { String subPath4 = ""; String fileLocation3 = filesetOperationDispatcher.getFileLocation(filesetIdent1, subPath4); Assertions.assertEquals(fileset1.storageLocation(), fileLocation3); + + // test mount a single file + String filesetName2 = "test_get_file_location_2"; + String filesetLocation2 = "/tmp/test_get_file_location_" + UUID.randomUUID(); + NameIdentifier filesetIdent2 = NameIdentifier.of(filesetNs, filesetName2); + filesetOperationDispatcher.createFileset( + filesetIdent2, "comment", Fileset.Type.MANAGED, filesetLocation2, props); + File localFile2 = new File(filesetLocation2); + try { + // replace dir to file + if (localFile2.exists()) { + localFile2.delete(); + } + localFile2.createNewFile(); + + String subPath = "/year=2024/month=07/day=22/test.parquet"; + Map contextMap = Maps.newHashMap(); + contextMap.put( + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, + FilesetDataOperation.RENAME.name()); + CallerContext callerContext = CallerContext.builder().withContext(contextMap).build(); + CallerContext.CallerContextHolder.set(callerContext); + + Assertions.assertThrows( + GravitinoRuntimeException.class, + () -> filesetOperationDispatcher.getFileLocation(filesetIdent2, subPath)); + } catch (IOException e) { + // ignore + } finally { + CallerContext.CallerContextHolder.remove(); + if (localFile2.exists()) { + localFile2.delete(); + } + } + + // test rename with an empty subPath + String filesetName3 = "test_get_file_location_3"; + String filesetLocation3 = "/tmp/test_get_file_location_" + UUID.randomUUID(); + NameIdentifier filesetIdent3 = NameIdentifier.of(filesetNs, filesetName3); + filesetOperationDispatcher.createFileset( + filesetIdent3, "comment", Fileset.Type.MANAGED, filesetLocation3, props); + File localFile3 = new File(filesetLocation3); + try { + // replace dir to file + if (localFile3.exists()) { + localFile3.delete(); + } + localFile3.createNewFile(); + + String subPath = ""; + Map contextMap = Maps.newHashMap(); + contextMap.put( + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, + FilesetDataOperation.RENAME.name()); + CallerContext callerContext = CallerContext.builder().withContext(contextMap).build(); + CallerContext.CallerContextHolder.set(callerContext); + + Assertions.assertThrows( + GravitinoRuntimeException.class, + () -> filesetOperationDispatcher.getFileLocation(filesetIdent2, subPath)); + } catch (IOException e) { + // ignore + } finally { + CallerContext.CallerContextHolder.remove(); + if (localFile3.exists()) { + localFile3.delete(); + } + } + + // test storage location end with "/" + String filesetName4 = "test_get_file_location_4"; + String filesetLocation4 = "/tmp/test_get_file_location_" + UUID.randomUUID() + "/"; + NameIdentifier filesetIdent = NameIdentifier.of(filesetNs, filesetName4); + filesetOperationDispatcher.createFileset( + filesetIdent, "comment", Fileset.Type.MANAGED, filesetLocation4, props); + String subPath = "/test/test.parquet"; + String fileLocation = filesetOperationDispatcher.getFileLocation(filesetIdent, subPath); + Assertions.assertEquals( + String.format("%s%s", filesetLocation4, subPath.substring(1)), fileLocation); } finally { File path = new File(tmpDir); if (path.exists()) { diff --git a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java index 52ce63e234a..d6bbd81c344 100644 --- a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java +++ b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import java.io.File; import java.io.IOException; import java.time.Instant; import java.util.HashMap; @@ -34,8 +35,12 @@ import org.apache.gravitino.TestSchema; import org.apache.gravitino.TestTable; import org.apache.gravitino.TestTopic; +import org.apache.gravitino.audit.CallerContext; +import org.apache.gravitino.audit.FilesetAuditConstants; +import org.apache.gravitino.audit.FilesetDataOperation; import org.apache.gravitino.exceptions.ConnectionFailedException; import org.apache.gravitino.exceptions.FilesetAlreadyExistsException; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; import org.apache.gravitino.exceptions.NoSuchCatalogException; import org.apache.gravitino.exceptions.NoSuchFilesetException; import org.apache.gravitino.exceptions.NoSuchSchemaException; @@ -61,9 +66,12 @@ import org.apache.gravitino.rel.expressions.sorts.SortOrder; import org.apache.gravitino.rel.expressions.transforms.Transform; import org.apache.gravitino.rel.indexes.Index; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestCatalogOperations implements CatalogOperations, TableCatalog, FilesetCatalog, TopicCatalog, SupportsSchemas { + private static final Logger LOG = LoggerFactory.getLogger(TestCatalogOperations.class); private final Map tables; @@ -446,11 +454,56 @@ public String getFileLocation(NameIdentifier ident, String subPath) { Fileset fileset = loadFileset(ident); + boolean isSingleFile = checkSingleFile(fileset); + // if the storage location is a single file, it cannot have sub path to access. + if (isSingleFile && StringUtils.isBlank(processedSubPath)) { + throw new GravitinoRuntimeException( + "Sub path should always be blank, because the fileset only mounts a single file."); + } + + // do checks for some data operations. + if (hasCallerContext()) { + Map contextMap = CallerContext.CallerContextHolder.get().context(); + String operation = + contextMap.getOrDefault( + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, + FilesetDataOperation.UNKNOWN.name()); + if (!FilesetDataOperation.checkValid(operation)) { + LOG.warn( + "The data operation: {} is not valid, we cannot do some checks for this operation.", + operation); + } else { + FilesetDataOperation dataOperation = FilesetDataOperation.valueOf(operation); + switch (dataOperation) { + case RENAME: + // Fileset only mounts a single file, the storage location of the fileset cannot be + // renamed; Otherwise the metadata in the Gravitino server may be inconsistent. + if (isSingleFile) { + throw new GravitinoRuntimeException( + "Cannot rename the fileset: %s which only mounts to a single file.", ident); + } + // if the sub path is blank, it cannot be renamed, + // otherwise the metadata in the Gravitino server may be inconsistent. + if (StringUtils.isBlank(processedSubPath) + || (processedSubPath.startsWith(SLASH) && processedSubPath.length() == 1)) { + throw new GravitinoRuntimeException( + "subPath cannot be blank when need to rename a file or a directory."); + } + break; + default: + break; + } + } + } + String fileLocation; - // subPath cannot be null, so we only need check if it is blank - if (StringUtils.isBlank(processedSubPath)) { + // 1. if the storage location is a single file, we pass the storage location directly + // 2. if the processed sub path is blank, we pass the storage location directly + if (isSingleFile || StringUtils.isBlank(processedSubPath)) { fileLocation = fileset.storageLocation(); } else { + // the processed sub path always starts with "/" if it is not blank, + // so we can safely remove the tailing slash if storage location ends with "/". String storageLocation = fileset.storageLocation().endsWith(SLASH) ? fileset.storageLocation().substring(0, fileset.storageLocation().length() - 1) @@ -566,4 +619,19 @@ public void testConnection( throw new ConnectionFailedException("Connection failed"); } } + + private boolean hasCallerContext() { + return CallerContext.CallerContextHolder.get() != null + && CallerContext.CallerContextHolder.get().context() != null + && !CallerContext.CallerContextHolder.get().context().isEmpty(); + } + + private boolean checkSingleFile(Fileset fileset) { + try { + File locationPath = new File(fileset.storageLocation()); + return locationPath.isFile(); + } catch (Exception e) { + return false; + } + } } diff --git a/server/src/main/java/org/apache/gravitino/server/web/Utils.java b/server/src/main/java/org/apache/gravitino/server/web/Utils.java index 3503f90dd9a..5a0ece3324d 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/Utils.java +++ b/server/src/main/java/org/apache/gravitino/server/web/Utils.java @@ -160,18 +160,23 @@ public static Map filterFilesetAuditHeaders(HttpServletRequest h String internalClientType = httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE); - if (StringUtils.isNotBlank(internalClientType) - && InternalClientType.checkValid(internalClientType)) { + if (StringUtils.isNotBlank(internalClientType)) { filteredHeaders.put( - FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, internalClientType); + FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, + InternalClientType.checkValid(internalClientType) + ? internalClientType + : InternalClientType.UNKNOWN.name()); } String dataOperation = httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION); if (StringUtils.isNotBlank( - httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION)) - && FilesetDataOperation.checkValid(dataOperation)) { - filteredHeaders.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, dataOperation); + httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION))) { + filteredHeaders.put( + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, + FilesetDataOperation.checkValid(dataOperation) + ? dataOperation + : FilesetDataOperation.UNKNOWN.name()); } return filteredHeaders; } diff --git a/server/src/test/java/org/apache/gravitino/server/web/TestUtils.java b/server/src/test/java/org/apache/gravitino/server/web/TestUtils.java index b77fa75e4f7..c495a6275b5 100644 --- a/server/src/test/java/org/apache/gravitino/server/web/TestUtils.java +++ b/server/src/test/java/org/apache/gravitino/server/web/TestUtils.java @@ -180,15 +180,19 @@ public void testFilterFilesetAuditHeaders() { HttpServletRequest mockRequest1 = Mockito.mock(HttpServletRequest.class); when(mockRequest1.getHeader(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE)) .thenReturn("test"); - Assertions.assertThrows( - IllegalArgumentException.class, () -> Utils.filterFilesetAuditHeaders(mockRequest1)); + Map auditMap1 = Utils.filterFilesetAuditHeaders(mockRequest1); + Assertions.assertEquals( + InternalClientType.UNKNOWN.name(), + auditMap1.get(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE)); // test invalid fileset data operation HttpServletRequest mockRequest2 = Mockito.mock(HttpServletRequest.class); when(mockRequest2.getHeader(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION)) .thenReturn("test"); - Assertions.assertThrows( - IllegalArgumentException.class, () -> Utils.filterFilesetAuditHeaders(mockRequest2)); + Map auditMap2 = Utils.filterFilesetAuditHeaders(mockRequest2); + Assertions.assertEquals( + FilesetDataOperation.UNKNOWN.name(), + auditMap2.get(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION)); // test normal audit headers HttpServletRequest mockRequest3 = Mockito.mock(HttpServletRequest.class);