Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 committed Oct 14, 2024
1 parent 608081b commit 9edfe82
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.gravitino.catalog.hadoop;

import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -749,62 +747,42 @@ private boolean checkSingleFile(Fileset fileset) {
}

FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
// Set by catalog properties 'default-filesystem' explicitly.
String defaultFSSetByUsers =
String defaultFilesystemProvider =
(String)
propertiesMetadata
.catalogPropertiesMetadata()
.getOrDefault(config, HadoopCatalogPropertiesMetadata.DEFAULT_FS);

// Set by properties 'gravitino.bypass.fs.defaultFS'.
String defaultFSfromByPass = config.get(CATALOG_BYPASS_PREFIX + DEFAULT_FS);
String schema;
Path fsPath;

Map<String, String> newConfig = Maps.newHashMap(config);
if (path != null && path.toUri().getScheme() != null) {
schema = path.toUri().getScheme();
fsPath = path;
} else {
if (defaultFSSetByUsers == null && defaultFSfromByPass == null) {
throw new IllegalArgumentException(
String.format(
"Can't get the schema from the path: %s, and the `defaultFS` and"
+ " `gravitino.bypass.fs.defaultFS` is not set.",
path));
}

if (defaultFSSetByUsers != null) {
fsPath = new Path(defaultFSSetByUsers);
schema = fsPath.toUri().getScheme();
if (schema == null) {
throw new IllegalArgumentException(
String.format(
"Can't get the schema from the path: %s, and can't get schema from `defaultFS`.",
path));
}
if (path == null) {
if (defaultFilesystemProvider != null) {
return getByFileSystemByScheme(defaultFilesystemProvider, newConfig);
} else {
fsPath = new Path(defaultFSfromByPass);
schema = fsPath.toUri().getScheme();
if (schema == null) {
throw new IllegalArgumentException(
String.format(
"Can't get the schema from the path: %s, and can't get schema from `gravitino.bypass.fs.defaultFS`.",
path));
}
LOG.warn("The path and default filesystem provider are both null, using local file system");
return getByFileSystemByScheme(LOCAL_FILE_SCHEMA, newConfig);
}
}

// For any non-local file system, we need to explicitly set the default FS.
if (!newConfig.containsKey(DEFAULT_FS) && !LOCAL_FILE_SCHEMA.equals(schema)) {
newConfig.put(DEFAULT_FS, fsPath.toString());
// Path is not null;
if (path.toUri().getScheme() == null) {
LOG.warn(
"Can't get schema from path: {} and default filesystem provider are both null, using"
+ " local file system",
path);
return getByFileSystemByScheme(LOCAL_FILE_SCHEMA, newConfig);
} else {
newConfig.put(DEFAULT_FS, path.toUri().toString());
return getByFileSystemByScheme(path.toUri().getScheme(), newConfig);
}
}

FileSystemProvider provider = fileSystemProvidersMap.get(schema);
private FileSystem getByFileSystemByScheme(String scheme, Map<String, String> config)
throws IOException {
FileSystemProvider provider = fileSystemProvidersMap.get(scheme);
if (provider == null) {
throw new IllegalArgumentException("Unsupported scheme: " + schema);
throw new IllegalArgumentException("Unsupported scheme: " + scheme);
}

return provider.getFileSystem(newConfig);
return provider.getFileSystem(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
public static final String FILESYSTEM_PROVIDERS = "filesystem-providers";

/**
* The default file system URI. It is used to create the default file system instance; if not
* specified, the default file system instance will be created with the schema prefix in the file
* path.
* The default file system provider. It is used to create the default file system instance; if not
* specified, file system instance will be created with the schema prefix in the file path like
* 'file:/tmp/'. If there is no schema prefix, the default file system provider will be local file
* system.
*/
public static final String DEFAULT_FS = "default-filesystem";
public static final String DEFAULT_FS = "default-filesystem-provider";

private static final Map<String, PropertyEntry<?>> HADOOP_CATALOG_PROPERTY_ENTRIES =
ImmutableMap.<String, PropertyEntry<?>>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,6 @@ public void testCreateLoadAndDeleteFilesetWithLocations(
if (catalogPath != null) {
catalogProps.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
}
catalogProps.put(HadoopCatalogPropertiesMetadata.DEFAULT_FS, "file:///");

NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", schemaName);
try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) {
Expand Down Expand Up @@ -1160,8 +1159,6 @@ private Schema createSchema(String name, String comment, String catalogPath, Str
props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
}

props.put(HadoopCatalogPropertiesMetadata.DEFAULT_FS, "file:///");

try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) {
ops.initialize(props, randomCatalogInfo("m1", "c1"), HADOOP_PROPERTIES_METADATA);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.gravitino.client.integration.test.authorization;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Arrays;
Expand Down Expand Up @@ -72,11 +71,7 @@ public static void startIntegrationTest() throws Exception {

Catalog filesetCatalog =
metalake.createCatalog(
"fileset_catalog",
Catalog.Type.FILESET,
"hadoop",
"comment",
ImmutableMap.of("default-filesystem", "file:///"));
"fileset_catalog", Catalog.Type.FILESET, "hadoop", "comment", Collections.emptyMap());
NameIdentifier fileIdent = NameIdentifier.of("fileset_schema", "fileset");
filesetCatalog.asSchemas().createSchema("fileset_schema", "comment", Collections.emptyMap());
filesetCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.apache.gravitino.server.GravitinoServer.WEBSERVER_CONF_PREFIX;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collections;
Expand Down Expand Up @@ -162,11 +161,7 @@ public void testCreateFileset() {

Catalog catalog =
metalake.createCatalog(
catalogName,
Catalog.Type.FILESET,
"hadoop",
"comment",
ImmutableMap.of("default-filesystem", "file:///"));
catalogName, Catalog.Type.FILESET, "hadoop", "comment", Collections.emptyMap());

// Test to create a schema with a not-existed user
Catalog anotherCatalog = anotherMetalake.loadCatalog(catalogName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.gravitino.client.integration.test.authorization;

import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collections;
Expand Down Expand Up @@ -106,11 +105,7 @@ public void testCreateFileset() {
String catalogNameA = RandomNameUtils.genRandomName("catalogA");
Catalog catalog =
metalake.createCatalog(
catalogNameA,
Catalog.Type.FILESET,
"hadoop",
"comment",
ImmutableBiMap.of("default-filesystem", "file:///"));
catalogNameA, Catalog.Type.FILESET, "hadoop", "comment", Collections.emptyMap());
NameIdentifier fileIdent = NameIdentifier.of("schema_owner", "fileset_owner");
catalog.asSchemas().createSchema("schema_owner", "comment", Collections.emptyMap());
catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TestCommonAuth:
fileset_name: str = "test_client_fileset"
fileset_comment: str = "fileset_comment"

fileset_location: str = "file:///tmp/TestFilesetCatalog"
fileset_location: str = "/tmp/TestFilesetCatalog"
fileset_properties_key1: str = "fileset_properties_key1"
fileset_properties_value1: str = "fileset_properties_value1"
fileset_properties_key2: str = "fileset_properties_key2"
Expand Down Expand Up @@ -115,7 +115,7 @@ def init_test_env(self):
catalog_type=Catalog.Type.FILESET,
provider=self.catalog_provider,
comment="",
properties={self.catalog_location_prop: "file:/tmp/test1"},
properties={self.catalog_location_prop: "/tmp/test1"},
)
catalog.as_schemas().create_schema(
schema_name=self.schema_name, comment="", properties={}
Expand Down
6 changes: 3 additions & 3 deletions clients/client-python/tests/integration/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def create_catalog(self, catalog_name) -> Catalog:
catalog_type=Catalog.Type.FILESET,
provider=self.catalog_provider,
comment=self.catalog_comment,
properties={self.catalog_location_prop: "file:/tmp/test_schema"},
properties={self.catalog_location_prop: "/tmp/test_schema"},
)

def clean_test_data(self):
Expand Down Expand Up @@ -105,7 +105,7 @@ def test_create_catalog(self):
catalog = self.create_catalog(self.catalog_name)
self.assertEqual(catalog.name(), self.catalog_name)
self.assertEqual(
catalog.properties(), {self.catalog_location_prop: "file:/tmp/test_schema"}
catalog.properties(), {self.catalog_location_prop: "/tmp/test_schema"}
)

def test_failed_create_catalog(self):
Expand Down Expand Up @@ -155,7 +155,7 @@ def test_load_catalog(self):
self.assertEqual(catalog.comment(), self.catalog_comment)
self.assertEqual(
catalog.properties(),
{self.catalog_location_prop: "file:/tmp/test_schema"},
{self.catalog_location_prop: "/tmp/test_schema"},
)
self.assertEqual(catalog.audit_info().creator(), "anonymous")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TestFilesetCatalog(IntegrationTestEnv):
fileset_alter_name: str = fileset_name + "Alter"
fileset_comment: str = "fileset_comment"

fileset_location: str = "file:///tmp/TestFilesetCatalog"
fileset_location: str = "/tmp/TestFilesetCatalog"
fileset_properties_key1: str = "fileset_properties_key1"
fileset_properties_value1: str = "fileset_properties_value1"
fileset_properties_key2: str = "fileset_properties_key2"
Expand Down Expand Up @@ -145,7 +145,7 @@ def init_test_env(self):
catalog_type=Catalog.Type.FILESET,
provider=self.catalog_provider,
comment="",
properties={self.catalog_location_prop: "file:///tmp/test1"},
properties={self.catalog_location_prop: "/tmp/test1"},
)
catalog.as_schemas().create_schema(
schema_name=self.schema_name, comment="", properties={}
Expand Down Expand Up @@ -246,7 +246,7 @@ def test_get_file_location(self):
fileset_ident: NameIdentifier = NameIdentifier.of(
self.schema_name, "test_get_file_location"
)
fileset_location = "file:/tmp/test_get_file_location"
fileset_location = "/tmp/test_get_file_location"
self.create_custom_fileset(fileset_ident, fileset_location)
actual_file_location = (
self.gravitino_client.load_catalog(name=self.catalog_name)
Expand All @@ -255,7 +255,7 @@ def test_get_file_location(self):
)

self.assertEqual(
actual_file_location, "file:/tmp/test_get_file_location/test/test.txt"
actual_file_location, "/tmp/test_get_file_location/test/test.txt"
)

# test rename without sub path should throw an exception
Expand Down
2 changes: 1 addition & 1 deletion clients/client-python/tests/integration/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def init_test_env(self):
catalog_type=Catalog.Type.FILESET,
provider=self.catalog_provider,
comment="",
properties={self.catalog_location_prop: "file:/tmp/test_schema"},
properties={self.catalog_location_prop: "/tmp/test_schema"},
)

def clean_test_data(self):
Expand Down
2 changes: 1 addition & 1 deletion docs/hadoop-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Besides the [common catalog properties](./gravitino-server-config.md#gravitino-c
|----------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|-------------------------------------------------------------|---------------|
| `location` | The storage location managed by Hadoop catalog. | (none) | No | 0.5.0 |
| `filesystem-providers` | The filesystem providers for the Hadoop catalog. Gravitino already support `local file` and `hdfs`, if you want to support other file system, you can implement `FileSystemProvider` and set this value | (none) | No | 0.7.0 |
| `default-filesystem` | The default file system of this Hadoop catalog. This configuration is equivalent to Hadoop `fs.defaultFS` | (none) | No | 0.7.0 |
| `default-filesystem-provider` | The default file system of this Hadoop catalog. The value of this can be 'file', 'hdfs' currently. | (none) | No | 0.7.0 |
| `authentication.impersonation-enable` | Whether to enable impersonation for the Hadoop catalog. | `false` | No | 0.5.1 |
| `authentication.type` | The type of authentication for Hadoop catalog, currently we only support `kerberos`, `simple`. | `simple` | No | 0.5.1 |
| `authentication.kerberos.principal` | The principal of the Kerberos authentication | (none) | required if the value of `authentication.type` is Kerberos. | 0.5.1 |
Expand Down

0 comments on commit 9edfe82

Please sign in to comment.