Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4708] improvement(client-java/server): Add implementations for the getFileLocation interface in Java Client / Server #4858

Merged
merged 14 commits into from
Sep 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.dto.AuditDTO;
import org.apache.gravitino.dto.CatalogDTO;
import org.apache.gravitino.dto.requests.FilesetCreateRequest;
import org.apache.gravitino.dto.requests.FilesetUpdateRequest;
import org.apache.gravitino.dto.requests.FilesetUpdatesRequest;
import org.apache.gravitino.dto.responses.DropResponse;
import org.apache.gravitino.dto.responses.EntityListResponse;
import org.apache.gravitino.dto.responses.FileLocationResponse;
import org.apache.gravitino.dto.responses.FilesetResponse;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
Expand Down Expand Up @@ -238,7 +241,23 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
checkFilesetNameIdentifier(ident);
Namespace fullNamespace = getFilesetFullNamespace(ident.namespace());

CallerContext callerContext = CallerContext.CallerContextHolder.get();

Map<String, String> params = new HashMap<>();
params.put("sub_path", RESTUtils.encodeString(subPath));
FileLocationResponse resp =
restClient.get(
formatFileLocationRequestPath(fullNamespace, ident.name()),
params,
FileLocationResponse.class,
callerContext != null ? callerContext.context() : Collections.emptyMap(),
ErrorHandlers.filesetErrorHandler());
resp.validate();

return resp.getFileLocation();
}

@VisibleForTesting
Expand All @@ -252,6 +271,19 @@ static String formatFilesetRequestPath(Namespace ns) {
.toString();
}

@VisibleForTesting
static String formatFileLocationRequestPath(Namespace ns, String name) {
Namespace schemaNs = Namespace.of(ns.level(0), ns.level(1));
return new StringBuilder()
.append(formatSchemaRequestPath(schemaNs))
.append("/")
.append(RESTUtils.encodeString(ns.level(2)))
.append("/filesets/")
.append(RESTUtils.encodeString(name))
.append("/location")
.toString();
}

/**
* Check whether the namespace of a fileset is valid.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,19 @@
import com.google.common.collect.ImmutableMap;
import java.nio.file.NoSuchFileException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.dto.AuditDTO;
import org.apache.gravitino.dto.CatalogDTO;
import org.apache.gravitino.dto.file.FilesetDTO;
Expand All @@ -43,17 +52,23 @@
import org.apache.gravitino.dto.responses.DropResponse;
import org.apache.gravitino.dto.responses.EntityListResponse;
import org.apache.gravitino.dto.responses.ErrorResponse;
import org.apache.gravitino.dto.responses.FileLocationResponse;
import org.apache.gravitino.dto.responses.FilesetResponse;
import org.apache.gravitino.exceptions.AlreadyExistsException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NotFoundException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.rest.RESTUtils;
import org.apache.hc.core5.http.Method;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockserver.matchers.Times;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.Parameter;

public class TestFilesetCatalog extends TestBase {

Expand Down Expand Up @@ -400,6 +415,127 @@ public void testAlterFileset() throws JsonProcessingException {
"internal error");
}

@Test
public void testGetFileLocation() throws JsonProcessingException {
NameIdentifier fileset = NameIdentifier.of(metalakeName, catalogName, "schema1", "fileset1");
String mockSubPath = "mock_location/test";
String filesetPath =
withSlash(
FilesetCatalog.formatFileLocationRequestPath(
Namespace.of(metalakeName, catalogName, "schema1"), fileset.name()));
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(mockSubPath));

String mockFileLocation =
String.format("file:/fileset/%s/%s/%s/%s", catalogName, "schema1", "fileset1", mockSubPath);
FileLocationResponse resp = new FileLocationResponse(mockFileLocation);
buildMockResource(Method.GET, filesetPath, queryParams, null, resp, SC_OK);

String actualFileLocation =
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath);
Assertions.assertTrue(StringUtils.isNotBlank(actualFileLocation));
Assertions.assertEquals(mockFileLocation, actualFileLocation);

// Throw schema not found exception
ErrorResponse errResp =
ErrorResponse.notFound(NoSuchSchemaException.class.getSimpleName(), "schema not found");
buildMockResource(Method.GET, filesetPath, null, errResp, SC_NOT_FOUND);
Assertions.assertThrows(
NoSuchSchemaException.class,
() ->
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath),
"schema not found");

ErrorResponse errResp1 =
ErrorResponse.notFound(NotFoundException.class.getSimpleName(), "fileset not found");
buildMockResource(Method.GET, filesetPath, null, errResp1, SC_NOT_FOUND);
Assertions.assertThrows(
NotFoundException.class,
() ->
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath),
"fileset not found");

ErrorResponse errResp2 = ErrorResponse.internalError("internal error");
buildMockResource(Method.GET, filesetPath, null, errResp2, SC_SERVER_ERROR);
Assertions.assertThrows(
RuntimeException.class,
() ->
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath),
"internal error");
}

@Test
public void testCallerContextToHeader() throws JsonProcessingException {
NameIdentifier fileset = NameIdentifier.of(metalakeName, catalogName, "schema1", "fileset1");
String mockSubPath = "mock_location/test";
String filesetPath =
withSlash(
FilesetCatalog.formatFileLocationRequestPath(
Namespace.of(metalakeName, catalogName, "schema1"), fileset.name()));
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(mockSubPath));
String mockFileLocation =
String.format("file:/fileset/%s/%s/%s/%s", catalogName, "schema1", "fileset1", mockSubPath);
FileLocationResponse resp = new FileLocationResponse(mockFileLocation);
String respJson = MAPPER.writeValueAsString(resp);

List<Parameter> parameters =
queryParams.entrySet().stream()
.map(kv -> new Parameter(kv.getKey(), kv.getValue()))
.collect(Collectors.toList());
HttpRequest mockRequest =
HttpRequest.request(filesetPath)
.withMethod(Method.GET.name())
.withQueryStringParameters(parameters);
HttpResponse mockResponse = HttpResponse.response().withStatusCode(SC_OK).withBody(respJson);

// set the thread local context
Map<String, String> context = new HashMap<>();
context.put(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
InternalClientType.HADOOP_GVFS.name());
context.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.GET_FILE_STATUS.name());
CallerContext callerContext = CallerContext.builder().withContext(context).build();
CallerContext.CallerContextHolder.set(callerContext);

// Using Times.exactly(1) will only match once for the request, so we could set difference
// responses for the same request and path.
AtomicReference<String> internalClientType = new AtomicReference<>(null);
AtomicReference<String> dataOperation = new AtomicReference<>(null);
mockServer
.when(mockRequest, Times.exactly(1))
.respond(
httpRequest -> {
internalClientType.set(
httpRequest.getFirstHeader(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE));
dataOperation.set(
httpRequest.getFirstHeader(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION));
return mockResponse;
});
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath);
Assertions.assertEquals(FilesetDataOperation.GET_FILE_STATUS.name(), dataOperation.get());
Assertions.assertEquals(InternalClientType.HADOOP_GVFS.name(), internalClientType.get());
}

private FilesetDTO mockFilesetDTO(
String name,
Fileset.Type type,
Expand Down
65 changes: 65 additions & 0 deletions docs/open-api/filesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,46 @@ paths:
"5xx":
$ref: "./openapi.yaml#/components/responses/ServerErrorResponse"

/metalakes/{metalake}/catalogs/{catalog}/schemas/{schema}/filesets/{fileset}/location:
parameters:
- $ref: "./openapi.yaml#/components/parameters/metalake"
- $ref: "./openapi.yaml#/components/parameters/catalog"
- $ref: "./openapi.yaml#/components/parameters/schema"
- $ref: "./openapi.yaml#/components/parameters/fileset"

get:
tags:
- location
summary: Get file location
operationId: getFileLocation
description: Returns the specified file location object
parameters:
- name: sub_path
in: query
required: true
schema:
type: string
description: The sub path to the file or directory
responses:
"200":
$ref: "#/components/responses/FileLocationResponse"
"404":
description: Not Found - The target fileset does not exist
content:
application/vnd.gravitino.v1+json:
schema:
$ref: "./openapi.yaml#/components/schemas/ErrorModel"
examples:
NoSuchMetalakeException:
$ref: "./metalakes.yaml#/components/examples/NoSuchMetalakeException"
NoSuchCatalogException:
$ref: "./catalogs.yaml#/components/examples/NoSuchCatalogException"
NoSuchSchemaException:
$ref: "./schemas.yaml#/components/examples/NoSuchSchemaException"
NoSuchFilesetException:
$ref: "#/components/examples/NoSuchFilesetException"
"5xx":
$ref: "./openapi.yaml#/components/responses/ServerErrorResponse"

components:

Expand Down Expand Up @@ -356,6 +396,25 @@ components:
FilesetResponse:
$ref: "#/components/examples/FilesetResponse"

FileLocationResponse:
description: The response of the file location object
content:
application/vnd.gravitino.v1+json:
schema:
type: object
properties:
code:
type: integer
format: int32
description: Status code of the response
enum:
- 0
fileLocation:
type: string
description: The actual file location
examples:
FileLocationResponse:
$ref: "#/components/examples/FileLocationResponse"

examples:
FilesetCreateRequest:
Expand Down Expand Up @@ -385,6 +444,12 @@ components:
}
}

FileLocationResponse:
value: {
"code": 0,
"fileLocation": "hdfs://host/user/fileset/schema/fileset1/test.parquet"
}

FilesetAlreadyExistsException:
value: {
"code": 1004,
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/apache/gravitino/server/web/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
*/
package org.apache.gravitino.server.web;

import com.google.common.collect.Maps;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.UserPrincipal;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.auth.AuthConstants;
import org.apache.gravitino.dto.responses.ErrorResponse;
import org.apache.gravitino.utils.PrincipalUtils;
Expand Down Expand Up @@ -148,4 +154,25 @@ public static Response doAs(
}
return PrincipalUtils.doAs(principal, action);
}

public static Map<String, String> filterFilesetAuditHeaders(HttpServletRequest httpRequest) {
Map<String, String> filteredHeaders = Maps.newHashMap();

String internalClientType =
httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE);
if (StringUtils.isNotBlank(internalClientType)
&& InternalClientType.checkValid(internalClientType)) {
filteredHeaders.put(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, internalClientType);
}

String dataOperation =
httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION);
if (StringUtils.isNotBlank(
httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION))
&& FilesetDataOperation.checkValid(dataOperation)) {
filteredHeaders.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, dataOperation);
}
return filteredHeaders;
}
}
Loading
Loading