Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4888] improvement(core): Add the implementations for getFileLocation interface in core module #4891

Merged
merged 15 commits into from
Sep 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public boolean dropFileset(NameIdentifier ident) {

@Override
public String getFileLocation(NameIdentifier ident, String subPath) {
throw new UnsupportedOperationException("Not implemented");
// The constraints of the name spec may be more strict than underlying catalog,
// and for compatibility reasons, we only apply case-sensitive capabilities here.
return dispatcher.getFileLocation(normalizeCaseSensitive(ident), subPath);
}

private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
return doWithCatalog(
getCatalogIdentifier(ident),
c -> c.doWithFilesetOps(f -> f.getFileLocation(ident, subPath)),
NonEmptyEntityException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,6 @@ public boolean filesetExists(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
return dispatcher.getFileLocation(ident, subPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
Expand All @@ -34,6 +35,8 @@
import org.apache.gravitino.listener.api.event.CreateFilesetFailureEvent;
import org.apache.gravitino.listener.api.event.DropFilesetEvent;
import org.apache.gravitino.listener.api.event.DropFilesetFailureEvent;
import org.apache.gravitino.listener.api.event.GetFileLocationEvent;
import org.apache.gravitino.listener.api.event.GetFileLocationFailureEvent;
import org.apache.gravitino.listener.api.event.ListFilesetEvent;
import org.apache.gravitino.listener.api.event.ListFilesetFailureEvent;
import org.apache.gravitino.listener.api.event.LoadFilesetEvent;
Expand Down Expand Up @@ -142,6 +145,18 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
try {
String actualFileLocation = dispatcher.getFileLocation(ident, subPath);
// get the audit info from the thread local context
CallerContext context = CallerContext.CallerContextHolder.get();
eventBus.dispatchEvent(
new GetFileLocationEvent(
PrincipalUtils.getCurrentUserName(), ident, actualFileLocation, context));
return actualFileLocation;
} catch (Exception e) {
eventBus.dispatchEvent(
new GetFileLocationFailureEvent(PrincipalUtils.getCurrentUserName(), ident, subPath, e));
throw e;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;
import org.apache.gravitino.audit.CallerContext;

/** Represents an event that occurs when getting a actual file location. */
@DeveloperApi
public final class GetFileLocationEvent extends FilesetEvent {
private final String actualFileLocation;

private final CallerContext callerContext;

/**
* Constructs a new {@code GetFileLocationEvent}, recording the attempt to get a file location.
*
* @param user The user who initiated the get file location.
* @param identifier The identifier of the file location that was attempted to be got.
* @param callerContext The audit caller context, this param can be null.
* @param actualFileLocation The actual file location which want to get.
*/
public GetFileLocationEvent(
String user,
NameIdentifier identifier,
String actualFileLocation,
CallerContext callerContext) {
xloya marked this conversation as resolved.
Show resolved Hide resolved
super(user, identifier);
this.actualFileLocation = actualFileLocation;
this.callerContext = callerContext;
}

public String actualFileLocation() {
return actualFileLocation;
}

public CallerContext callerContext() {
return callerContext;
}
xloya marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;

/**
* Represents an event that is generated when an attempt to get a file location from the system
* fails.
*/
@DeveloperApi
public final class GetFileLocationFailureEvent extends FilesetFailureEvent {
private final String subPath;
/**
xloya marked this conversation as resolved.
Show resolved Hide resolved
* Constructs a new {@code GetFileLocationFailureEvent}.
*
* @param user The user who initiated the get a file location.
* @param identifier The identifier of the file location that was attempted to be got.
* @param subPath The sub path of the actual file location which want to get.
* @param exception The exception that was thrown during the get a file location. This exception
* is key to diagnosing the failure, providing insights into what went wrong during the
* operation.
*/
public GetFileLocationFailureEvent(
String user, NameIdentifier identifier, String subPath, Exception exception) {
super(user, identifier, exception);
this.subPath = subPath;
}

public String subPath() {
xloya marked this conversation as resolved.
Show resolved Hide resolved
return subPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.apache.gravitino.StringIdentifier.ID_KEY;

import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.file.Fileset;
Expand Down Expand Up @@ -169,4 +171,53 @@ public void testCreateAndDropFileset() {
Assertions.assertTrue(dropped);
Assertions.assertFalse(filesetOperationDispatcher.dropFileset(filesetIdent1));
}

@Test
public void testCreateAndGetFileLocation() {
String tmpDir = "/tmp/test_get_file_location_" + UUID.randomUUID();
try {
Namespace filesetNs = Namespace.of(metalake, catalog, "schema1024");
Map<String, String> props = ImmutableMap.of("k1", "v1", "location", "schema1024");
schemaOperationDispatcher.createSchema(
NameIdentifier.of(filesetNs.levels()), "comment", props);

NameIdentifier filesetIdent1 = NameIdentifier.of(filesetNs, "fileset1024");
Fileset fileset1 =
filesetOperationDispatcher.createFileset(
filesetIdent1, "comment", Fileset.Type.MANAGED, tmpDir, props);
Assertions.assertEquals("fileset1024", fileset1.name());
Assertions.assertEquals("comment", fileset1.comment());
testProperties(props, fileset1.properties());
Assertions.assertEquals(Fileset.Type.MANAGED, fileset1.type());
Assertions.assertNotNull(fileset1.storageLocation());

// test sub path starts with "/"
String subPath1 = "/test/test.parquet";
String fileLocation1 = filesetOperationDispatcher.getFileLocation(filesetIdent1, subPath1);
Assertions.assertEquals(
String.format("%s%s", fileset1.storageLocation(), subPath1), fileLocation1);

// test sub path not starts with "/"
String subPath2 = "test/test.parquet";
String fileLocation2 = filesetOperationDispatcher.getFileLocation(filesetIdent1, subPath2);
Assertions.assertEquals(
String.format("%s/%s", fileset1.storageLocation(), subPath2), fileLocation2);

// test sub path is null
String subPath3 = null;
Assertions.assertThrows(
IllegalArgumentException.class,
() -> filesetOperationDispatcher.getFileLocation(filesetIdent1, subPath3));

// test sub path is blank but not null
String subPath4 = "";
String fileLocation3 = filesetOperationDispatcher.getFileLocation(filesetIdent1, subPath4);
Assertions.assertEquals(fileset1.storageLocation(), fileLocation3);
} finally {
File path = new File(tmpDir);
if (path.exists()) {
path.delete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.gravitino.connector;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
Expand Down Expand Up @@ -75,6 +77,8 @@ public class TestCatalogOperations

public static final String FAIL_TEST = "need-fail";

private static final String SLASH = "/";

public TestCatalogOperations(Map<String, String> config) {
tables = Maps.newHashMap();
schemas = Maps.newHashMap();
Expand Down Expand Up @@ -432,7 +436,28 @@ public boolean dropFileset(NameIdentifier ident) {

@Override
public String getFileLocation(NameIdentifier ident, String subPath) {
throw new UnsupportedOperationException("Not implemented");
Preconditions.checkArgument(subPath != null, "subPath must not be null");
String processedSubPath;
if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
processedSubPath = SLASH + subPath.trim();
} else {
processedSubPath = subPath.trim();
}

Fileset fileset = loadFileset(ident);

String fileLocation;
// subPath cannot be null, so we only need check if it is blank
if (StringUtils.isBlank(processedSubPath)) {
fileLocation = fileset.storageLocation();
} else {
String storageLocation =
fileset.storageLocation().endsWith(SLASH)
? fileset.storageLocation().substring(0, fileset.storageLocation().length() - 1)
: fileset.storageLocation();
fileLocation = String.format("%s%s", storageLocation, processedSubPath);
}
return fileLocation;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@

package org.apache.gravitino.listener.api.event;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.file.Fileset;
Expand Down Expand Up @@ -121,6 +127,48 @@ void testListFilesetEvent() {
Assertions.assertEquals(namespace, ((ListFilesetEvent) event).namespace());
}

@Test
void testGetFileLocationEvent() {
NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", fileset.name());
dispatcher.createFileset(
identifier,
fileset.comment(),
fileset.type(),
fileset.storageLocation(),
fileset.properties());
Event event = dummyEventListener.popEvent();
Assertions.assertEquals(identifier, event.identifier());
Assertions.assertEquals(CreateFilesetEvent.class, event.getClass());
FilesetInfo filesetInfo = ((CreateFilesetEvent) event).createdFilesetInfo();
checkFilesetInfo(filesetInfo, fileset);

Map<String, String> contextMap = Maps.newHashMap();
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
InternalClientType.HADOOP_GVFS.name());
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.GET_FILE_STATUS.name());
CallerContext callerContext = CallerContext.builder().withContext(contextMap).build();
CallerContext.CallerContextHolder.set(callerContext);
String fileLocation = dispatcher.getFileLocation(identifier, "test");
Event event1 = dummyEventListener.popEvent();
Assertions.assertEquals(identifier, event1.identifier());
Assertions.assertEquals(GetFileLocationEvent.class, event1.getClass());
String actualFileLocation = ((GetFileLocationEvent) event1).actualFileLocation();
Assertions.assertEquals(actualFileLocation, fileLocation);
CallerContext actualCallerContext = ((GetFileLocationEvent) event1).callerContext();
assertEquals(2, actualCallerContext.context().size());
Assertions.assertEquals(
InternalClientType.HADOOP_GVFS.name(),
actualCallerContext.context().get(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE));
Assertions.assertEquals(
FilesetDataOperation.GET_FILE_STATUS.name(),
actualCallerContext
.context()
.get(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION));
}

@Test
void testCreateSchemaFailureEvent() {
NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "fileset");
Expand Down Expand Up @@ -194,6 +242,20 @@ void testListFilesetFailureEvent() {
Assertions.assertEquals(namespace, ((ListFilesetFailureEvent) event).namespace());
}

@Test
void testGetFileLocationFailureEvent() {
NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "fileset");
Assertions.assertThrowsExactly(
GravitinoRuntimeException.class,
() -> failureDispatcher.getFileLocation(identifier, "/test"));
Event event = dummyEventListener.popEvent();
Assertions.assertEquals(identifier, event.identifier());
Assertions.assertEquals(GetFileLocationFailureEvent.class, event.getClass());
Assertions.assertEquals(
GravitinoRuntimeException.class,
((GetFileLocationFailureEvent) event).exception().getClass());
}

private void checkFilesetInfo(FilesetInfo filesetInfo, Fileset fileset) {
Assertions.assertEquals(fileset.name(), filesetInfo.name());
Assertions.assertEquals(fileset.type(), filesetInfo.type());
Expand Down Expand Up @@ -227,6 +289,8 @@ private FilesetDispatcher mockFilesetDispatcher() {
when(dispatcher.listFilesets(any(Namespace.class))).thenReturn(null);
when(dispatcher.alterFileset(any(NameIdentifier.class), any(FilesetChange.class)))
.thenReturn(fileset);
when(dispatcher.getFileLocation(any(NameIdentifier.class), any()))
.thenReturn("file:/test/xxx.parquet");
return dispatcher;
}

Expand Down
Loading