Skip to content

Commit

Permalink
add impl for get file location in python
Browse files Browse the repository at this point in the history
  • Loading branch information
xloya committed Sep 25, 2024
1 parent 1f98956 commit 4220326
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 5 deletions.
3 changes: 2 additions & 1 deletion clients/client-python/gravitino/audit/caller_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ def get():
@staticmethod
def remove():
"""Remove the CallerContext from the thread local."""
del caller_context_holder.caller_context
if hasattr(caller_context_holder, "caller_context"):
del caller_context_holder.caller_context
34 changes: 33 additions & 1 deletion clients/client-python/gravitino/catalog/fileset_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
from gravitino.api.catalog import Catalog
from gravitino.api.fileset import Fileset
from gravitino.api.fileset_change import FilesetChange
from gravitino.audit.caller_context import CallerContextHolder, CallerContext
from gravitino.catalog.base_schema_catalog import BaseSchemaCatalog
from gravitino.dto.audit_dto import AuditDTO
from gravitino.dto.requests.fileset_create_request import FilesetCreateRequest
from gravitino.dto.requests.fileset_update_request import FilesetUpdateRequest
from gravitino.dto.requests.fileset_updates_request import FilesetUpdatesRequest
from gravitino.dto.responses.drop_response import DropResponse
from gravitino.dto.responses.entity_list_response import EntityListResponse
from gravitino.dto.responses.file_location_response import FileLocationResponse
from gravitino.dto.responses.fileset_response import FilesetResponse
from gravitino.name_identifier import NameIdentifier
from gravitino.namespace import Namespace
Expand Down Expand Up @@ -244,7 +246,29 @@ def get_file_location(self, ident: NameIdentifier, sub_path: str) -> str:
Returns:
The actual location of the file or directory.
"""
raise NotImplementedError("Not implemented yet")
self.check_fileset_name_identifier(ident)

full_namespace = self._get_fileset_full_namespace(ident.namespace())
try:
caller_context: CallerContext = CallerContextHolder.get()
params = {"sub_path": encode_string(sub_path)}

resp = self.rest_client.get(
self.format_file_location_request_path(full_namespace, ident.name()),
params=params,
headers=(
caller_context.context() if caller_context is not None else None
),
error_handler=FILESET_ERROR_HANDLER,
)
file_location_resp = FileLocationResponse.from_json(
resp.body, infer_missing=True
)
file_location_resp.validate()

return file_location_resp.file_location()
finally:
CallerContextHolder.remove()

@staticmethod
def check_fileset_namespace(namespace: Namespace):
Expand Down Expand Up @@ -272,6 +296,14 @@ def format_fileset_request_path(namespace: Namespace) -> str:
schema_ns = Namespace.of(namespace.level(0), namespace.level(1))
return f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{encode_string(namespace.level(2))}/filesets"

@staticmethod
def format_file_location_request_path(namespace: Namespace, name: str) -> str:
schema_ns = Namespace.of(namespace.level(0), namespace.level(1))
return (
f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{encode_string(namespace.level(2))}"
f"/filesets/{encode_string(name)}/location"
)

@staticmethod
def to_fileset_update_request(change: FilesetChange):
if isinstance(change, FilesetChange.RenameFileset):
Expand Down
9 changes: 7 additions & 2 deletions clients/client-python/gravitino/utils/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,14 @@ def _request(
f"Error handler {type(error_handler).__name__} can't handle this response, error response body: {resp}"
) from None

def get(self, endpoint, params=None, error_handler=None, **kwargs):
def get(self, endpoint, params=None, headers=None, error_handler=None, **kwargs):
return self._request(
"get", endpoint, params=params, error_handler=error_handler, **kwargs
"get",
endpoint,
params=params,
headers=headers,
error_handler=error_handler,
**kwargs,
)

def delete(self, endpoint, error_handler=None, **kwargs):
Expand Down
48 changes: 47 additions & 1 deletion clients/client-python/tests/integration/test_fileset_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
Fileset,
FilesetChange,
)
from gravitino.exceptions.base import NoSuchFilesetException, GravitinoRuntimeException
from gravitino.audit.caller_context import CallerContext, CallerContextHolder
from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
from gravitino.audit.fileset_data_operation import FilesetDataOperation
from gravitino.exceptions.base import (
NoSuchFilesetException,
GravitinoRuntimeException,
)
from tests.integration.integration_test_env import IntegrationTestEnv

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -155,6 +161,18 @@ def create_fileset(self) -> Fileset:
properties=self.fileset_properties,
)

def create_custom_fileset(
self, ident: NameIdentifier, storage_location: str
) -> Fileset:
catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
return catalog.as_fileset_catalog().create_fileset(
ident=ident,
fileset_type=Fileset.Type.MANAGED,
comment=self.fileset_comment,
storage_location=storage_location,
properties=self.fileset_properties,
)

def test_create_fileset(self):
fileset = self.create_fileset()
self.assertIsNotNone(fileset)
Expand Down Expand Up @@ -223,3 +241,31 @@ def test_alter_fileset(self):
)
self.assertEqual(fileset_comment_removed.name(), self.fileset_name)
self.assertIsNone(fileset_comment_removed.comment())

def test_get_file_location(self):
fileset_ident: NameIdentifier = NameIdentifier.of(
self.schema_name, "test_get_file_location"
)
fileset_location = "/tmp/test_get_file_location"
self.create_custom_fileset(fileset_ident, fileset_location)
actual_file_location = (
self.gravitino_client.load_catalog(name=self.catalog_name)
.as_fileset_catalog()
.get_file_location(fileset_ident, "/test/test.txt")
)

self.assertEqual(actual_file_location, f"file:{fileset_location}/test/test.txt")

# test rename without sub path should throw an exception
caller_context = CallerContext(
{
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION: FilesetDataOperation.RENAME.name
}
)
with self.assertRaises(GravitinoRuntimeException):
CallerContextHolder.set(caller_context)
(
self.gravitino_client.load_catalog(name=self.catalog_name)
.as_fileset_catalog()
.get_file_location(fileset_ident, "")
)
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ def _set_thread_local_context(self, thread_name, context: Dict[str, str]):
CallerContextHolder.remove()

self.assertIsNone(CallerContextHolder.get())

# will not throw an exception if the context is not exists
CallerContextHolder.remove()
80 changes: 80 additions & 0 deletions clients/client-python/tests/unittests/test_fileset_catalog_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import unittest
from http.client import HTTPResponse
from unittest.mock import patch, Mock

from gravitino import GravitinoClient, Catalog, NameIdentifier
from gravitino.audit.caller_context import CallerContext, CallerContextHolder
from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
from gravitino.audit.fileset_data_operation import FilesetDataOperation
from gravitino.exceptions.handlers.fileset_error_handler import FILESET_ERROR_HANDLER
from gravitino.namespace import Namespace
from gravitino.utils import Response
from tests.unittests import mock_base


@mock_base.mock_data
class TestFilesetCatalogApi(unittest.TestCase):

def test_get_file_location(self, *mock_method):
json_data = {"code": 0, "fileLocation": "file:/test/1"}
json_str = json.dumps(json_data)

mock_http_resp = Mock(HTTPResponse)
mock_http_resp.getcode.return_value = 200
mock_http_resp.read.return_value = json_str
mock_http_resp.info.return_value = None
mock_http_resp.url = None
mock_resp = Response(mock_http_resp)

metalake_name: str = "metalake_demo"
catalog_name: str = "fileset_catalog"
gravitino_client = GravitinoClient(
uri="http://localhost:8090", metalake_name=metalake_name
)
catalog: Catalog = gravitino_client.load_catalog(catalog_name)

with patch(
"gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp,
) as mock_get:
fileset_ident: NameIdentifier = NameIdentifier.of(
"test", "test_get_file_location"
)
context = {
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION: FilesetDataOperation.RENAME.name
}
CallerContextHolder.set(CallerContext(context))
file_location: str = catalog.as_fileset_catalog().get_file_location(
fileset_ident, "/test/1"
)
# check the get input params as expected
mock_get.assert_called_once_with(
catalog.as_fileset_catalog().format_file_location_request_path(
Namespace.of("metalake_demo", "fileset_catalog", "test"),
fileset_ident.name(),
),
params={"sub_path": "/test/1"},
headers=context,
error_handler=FILESET_ERROR_HANDLER,
)
# check the caller context is removed
self.assertIsNone(CallerContextHolder.get())
# check the response is as expected
self.assertEqual(file_location, "file:/test/1")

0 comments on commit 4220326

Please sign in to comment.