Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4708] improvement(client-java/server): Add implementations for the getFileLocation interface in Java Client / Server #4858

Merged
merged 14 commits into from
Sep 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem

private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist";
private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does not exist";
private static final String SLASH = "/";

private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalogOperations.class);

Expand Down Expand Up @@ -357,7 +358,29 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
// TODO we need move some check logics in the Hadoop / Python GVFS to here.
Preconditions.checkArgument(subPath != null, "subPath must not be null");
String processedSubPath;
if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
processedSubPath = SLASH + subPath.trim();
} else {
processedSubPath = subPath.trim();
}

Fileset fileset = loadFileset(ident);

String fileLocation;
// subPath cannot be null, so we only need check if it is blank
if (StringUtils.isBlank(processedSubPath)) {
fileLocation = fileset.storageLocation();
} else {
String storageLocation =
fileset.storageLocation().endsWith(SLASH)
? fileset.storageLocation().substring(0, fileset.storageLocation().length() - 1)
: fileset.storageLocation();
fileLocation = String.format("%s%s", storageLocation, processedSubPath);
}
return fileLocation;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
return hadoopCatalogOperations.getFileLocation(ident, subPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,46 @@ public void testTestConnection() {
ImmutableMap.of()));
}

@Test
public void testGetFileLocation() throws IOException {
String schemaName = "schema1024";
String comment = "comment1024";
String schemaPath = TEST_ROOT_PATH + "/" + schemaName;
createSchema(schemaName, comment, null, schemaPath);

String catalogName = "c1";
String name = "fileset1024";
String storageLocation = TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + name;
Fileset fileset =
createFileset(name, schemaName, comment, Fileset.Type.MANAGED, null, storageLocation);

try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);
// test sub path starts with "/"
String subPath1 = "/test/test.parquet";
String fileLocation1 = ops.getFileLocation(filesetIdent, subPath1);
Assertions.assertEquals(
String.format("%s%s", fileset.storageLocation(), subPath1), fileLocation1);

// test sub path not starts with "/"
String subPath2 = "test/test.parquet";
String fileLocation2 = ops.getFileLocation(filesetIdent, subPath2);
Assertions.assertEquals(
String.format("%s/%s", fileset.storageLocation(), subPath2), fileLocation2);

// test sub path is null
String subPath3 = null;
Assertions.assertThrows(
IllegalArgumentException.class, () -> ops.getFileLocation(filesetIdent, subPath3));

// test sub path is blank but not null
String subPath4 = "";
String fileLocation3 = ops.getFileLocation(filesetIdent, subPath4);
Assertions.assertEquals(fileset.storageLocation(), fileLocation3);
}
}

private static Stream<Arguments> locationArguments() {
return Stream.of(
// Honor the catalog location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -29,13 +30,15 @@
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.dto.AuditDTO;
import org.apache.gravitino.dto.CatalogDTO;
import org.apache.gravitino.dto.requests.FilesetCreateRequest;
import org.apache.gravitino.dto.requests.FilesetUpdateRequest;
import org.apache.gravitino.dto.requests.FilesetUpdatesRequest;
import org.apache.gravitino.dto.responses.DropResponse;
import org.apache.gravitino.dto.responses.EntityListResponse;
import org.apache.gravitino.dto.responses.FileLocationResponse;
import org.apache.gravitino.dto.responses.FilesetResponse;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
Expand Down Expand Up @@ -238,7 +241,22 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
checkFilesetNameIdentifier(ident);
Namespace fullNamespace = getFilesetFullNamespace(ident.namespace());

CallerContext callerContext = CallerContext.CallerContextHolder.get();
Map<String, String> queryParams = Maps.newHashMap();
queryParams.put("subPath", subPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we should encoding the subPath to avoid supported chars in the URL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

FileLocationResponse resp =
restClient.get(
formatFilesetRequestPath(fullNamespace) + "/" + ident.name() + "/" + "fileLocation",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to add the th subPath into the URL, not a queryParam?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I prefer to use the queryParams for the following reasons:

  1. Currently other similar GET APIs pass parameters in the form of queryParams, like RelationalTable.listPartitions();
  2. I looked at the APIs of other commercial products, such as the REST API of Databricks' DBFS, and they also pass these parameters through queryParams: https://docs.databricks.com/api/workspace/dbfs/read;
  3. In the unit test, I tried to directly splice the URL, but it seems that the test framework cannot mock the request, so I guess some frameworks may not recognize the method of directly splicing parameters to the URL.

queryParams,
FileLocationResponse.class,
callerContext != null ? callerContext.context() : Collections.emptyMap(),
ErrorHandlers.filesetErrorHandler());
resp.validate();

return resp.getFileLocation();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@
import static org.apache.hc.core5.http.HttpStatus.SC_NOT_FOUND;
import static org.apache.hc.core5.http.HttpStatus.SC_OK;
import static org.apache.hc.core5.http.HttpStatus.SC_SERVER_ERROR;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.jsonwebtoken.lang.Maps;
import java.nio.file.NoSuchFileException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.dto.AuditDTO;
import org.apache.gravitino.dto.CatalogDTO;
import org.apache.gravitino.dto.file.FilesetDTO;
Expand All @@ -43,6 +53,7 @@
import org.apache.gravitino.dto.responses.DropResponse;
import org.apache.gravitino.dto.responses.EntityListResponse;
import org.apache.gravitino.dto.responses.ErrorResponse;
import org.apache.gravitino.dto.responses.FileLocationResponse;
import org.apache.gravitino.dto.responses.FilesetResponse;
import org.apache.gravitino.exceptions.AlreadyExistsException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
Expand All @@ -54,6 +65,11 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.StringUtils;
import org.mockserver.matchers.Times;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.Parameter;

public class TestFilesetCatalog extends TestBase {

Expand Down Expand Up @@ -400,6 +416,90 @@ public void testAlterFileset() throws JsonProcessingException {
"internal error");
}

@Test
public void testGetFileLocation() throws JsonProcessingException {
NameIdentifier fileset = NameIdentifier.of(metalakeName, catalogName, "schema1", "fileset1");
String mockSubPath = "mock_location/test";
String filesetPath =
withSlash(
FilesetCatalog.formatFilesetRequestPath(
Namespace.of(metalakeName, catalogName, "schema1"))
+ "/fileset1/fileLocation");
Map<String, String> queryParams = Maps.of("subPath", mockSubPath).build();
String mockFileLocation =
String.format("file:/fileset/%s/%s/%s/%s", catalogName, "schema1", "fileset1", mockSubPath);
FileLocationResponse resp = new FileLocationResponse(mockFileLocation);
buildMockResource(Method.GET, filesetPath, queryParams, null, resp, SC_OK);

String actualFileLocation =
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath);
Assertions.assertTrue(StringUtils.isNotBlank(actualFileLocation));
Assertions.assertEquals(mockFileLocation, actualFileLocation);
}

@Test
public void testCallerContextToHeader() throws JsonProcessingException {
NameIdentifier fileset = NameIdentifier.of(metalakeName, catalogName, "schema1", "fileset1");
String mockSubPath = "mock_location/test";
String filesetPath =
withSlash(
FilesetCatalog.formatFilesetRequestPath(
Namespace.of(metalakeName, catalogName, "schema1"))
+ "/fileset1/fileLocation");
Map<String, String> queryParams = Maps.of("subPath", mockSubPath).build();
String mockFileLocation =
String.format("file:/fileset/%s/%s/%s/%s", catalogName, "schema1", "fileset1", mockSubPath);
FileLocationResponse resp = new FileLocationResponse(mockFileLocation);
String respJson = MAPPER.writeValueAsString(resp);

List<Parameter> parameters =
queryParams.entrySet().stream()
.map(kv -> new Parameter(kv.getKey(), kv.getValue()))
.collect(Collectors.toList());
HttpRequest mockRequest =
HttpRequest.request(filesetPath)
.withMethod(Method.GET.name())
.withQueryStringParameters(parameters);
HttpResponse mockResponse = HttpResponse.response().withStatusCode(SC_OK).withBody(respJson);

// set the thread local context
Map<String, String> context = new HashMap<>();
context.put(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
InternalClientType.HADOOP_GVFS.name());
context.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.GET_FILE_STATUS.name());
CallerContext callerContext = CallerContext.builder().withContext(context).build();
CallerContext.CallerContextHolder.set(callerContext);

// Using Times.exactly(1) will only match once for the request, so we could set difference
// responses for the same request and path.
AtomicReference<String> internalClientType = new AtomicReference<>(null);
AtomicReference<String> dataOperation = new AtomicReference<>(null);
mockServer
.when(mockRequest, Times.exactly(1))
.respond(
httpRequest -> {
internalClientType.set(
httpRequest.getFirstHeader(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE));
dataOperation.set(
httpRequest.getFirstHeader(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION));
return mockResponse;
});
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath);
assertEquals(FilesetDataOperation.GET_FILE_STATUS.name(), dataOperation.get());
assertEquals(InternalClientType.HADOOP_GVFS.name(), internalClientType.get());
}

private FilesetDTO mockFilesetDTO(
String name,
Fileset.Type type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public boolean dropFileset(NameIdentifier ident) {

@Override
public String getFileLocation(NameIdentifier ident, String subPath) {
throw new UnsupportedOperationException("Not implemented");
// The constraints of the name spec may be more strict than underlying catalog,
// and for compatibility reasons, we only apply case-sensitive capabilities here.
return dispatcher.getFileLocation(normalizeCaseSensitive(ident), subPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest split the core part logic into another PR. In this PR, we only focus on client and server side API. Don't make the PR too big, which is hard to review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, this PR has been split to only include changes with the Java Client and Server.

}

private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
return doWithCatalog(
getCatalogIdentifier(ident),
c -> c.doWithFilesetOps(f -> f.getFileLocation(ident, subPath)),
NonEmptyEntityException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,6 @@ public boolean filesetExists(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
return dispatcher.getFileLocation(ident, subPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
Expand All @@ -34,6 +35,8 @@
import org.apache.gravitino.listener.api.event.CreateFilesetFailureEvent;
import org.apache.gravitino.listener.api.event.DropFilesetEvent;
import org.apache.gravitino.listener.api.event.DropFilesetFailureEvent;
import org.apache.gravitino.listener.api.event.GetFileLocationEvent;
import org.apache.gravitino.listener.api.event.GetFileLocationFailureEvent;
import org.apache.gravitino.listener.api.event.ListFilesetEvent;
import org.apache.gravitino.listener.api.event.ListFilesetFailureEvent;
import org.apache.gravitino.listener.api.event.LoadFilesetEvent;
Expand Down Expand Up @@ -142,6 +145,18 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
try {
String actualFileLocation = dispatcher.getFileLocation(ident, subPath);
// get the audit info from the thread local context
CallerContext context = CallerContext.CallerContextHolder.get();
eventBus.dispatchEvent(
new GetFileLocationEvent(
PrincipalUtils.getCurrentUserName(), ident, actualFileLocation, context));
return actualFileLocation;
} catch (Exception e) {
eventBus.dispatchEvent(
new GetFileLocationFailureEvent(PrincipalUtils.getCurrentUserName(), ident, subPath, e));
throw e;
}
}
}
Loading
Loading