diff --git a/clients/client-python/gravitino/api/__init__.py b/clients/client-python/gravitino/api/__init__.py new file mode 100644 index 00000000000..5779a3ad252 --- /dev/null +++ b/clients/client-python/gravitino/api/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" diff --git a/clients/client-python/gravitino/api/audit.py b/clients/client-python/gravitino/api/audit.py new file mode 100644 index 00000000000..08dc1407568 --- /dev/null +++ b/clients/client-python/gravitino/api/audit.py @@ -0,0 +1,44 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import ABC, abstractmethod +from datetime import datetime + + +class Audit(ABC): + """Represents the audit information of an entity.""" + + @abstractmethod + def creator(self) -> str: + """The creator of the entity. + + Returns: + the creator of the entity. + """ + pass + + @abstractmethod + def create_time(self) -> datetime: + """The creation time of the entity. + + Returns: + The creation time of the entity. + """ + pass + + @abstractmethod + def last_modifier(self) -> str: + """ + Returns: + The last modifier of the entity. + """ + pass + + @abstractmethod + def last_modified_time(self) -> datetime: + """ + Returns: + The last modified time of the entity. + """ + pass diff --git a/clients/client-python/gravitino/api/auditable.py b/clients/client-python/gravitino/api/auditable.py new file mode 100644 index 00000000000..fd7c6e8ad67 --- /dev/null +++ b/clients/client-python/gravitino/api/auditable.py @@ -0,0 +1,18 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import ABC, abstractmethod + +from gravitino.api.audit import Audit + + +class Auditable(ABC): + """ + An auditable entity is an entity that has audit information associated with it. This audit + information is used to track changes to the entity. + """ + + @abstractmethod + def audit_info(self) -> Audit: + pass diff --git a/clients/client-python/gravitino/api/catalog.py b/clients/client-python/gravitino/api/catalog.py new file mode 100644 index 00000000000..fef0de70735 --- /dev/null +++ b/clients/client-python/gravitino/api/catalog.py @@ -0,0 +1,129 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import abstractmethod +from enum import Enum +from typing import Dict, Optional + +from gravitino.api.auditable import Auditable +from gravitino.api.supports_schemas import SupportsSchemas + + +class Catalog(Auditable): + """The interface of a catalog. The catalog is the second level entity in the gravitino system, + containing a set of tables. + """ + class Type(Enum): + """The type of the catalog.""" + + RELATIONAL = "relational" + """"Catalog Type for Relational Data Structure, like db.table, catalog.db.table.""" + + FILESET = "fileset" + """Catalog Type for Fileset System (including HDFS, S3, etc.), like path/to/file""" + + MESSAGING = "messaging" + """Catalog Type for Message Queue, like kafka://topic""" + + UNSUPPORTED = "unsupported" + """Catalog Type for test only.""" + + PROPERTY_PACKAGE = "package" + """A reserved property to specify the package location of the catalog. The "package" is a string + of path to the folder where all the catalog related dependencies is located. The dependencies + under the "package" will be loaded by Gravitino to create the catalog. + + The property "package" is not needed if the catalog is a built-in one, Gravitino will search + the proper location using "provider" to load the dependencies. Only when the folder is in + different location, the "package" property is needed. + """ + + @abstractmethod + def name(self) -> str: + """ + Returns: + The name of the catalog. + """ + pass + + @abstractmethod + def type(self) -> Type: + """ + Returns: + The type of the catalog. + """ + pass + + @abstractmethod + def provider(self) -> str: + """ + Returns: + The provider of the catalog. + """ + pass + + @abstractmethod + def comment(self) -> Optional[str]: + """The comment of the catalog. Note. this method will return null if the comment is not set for + this catalog. + + Returns: + The provider of the catalog. + """ + pass + + @abstractmethod + def properties(self) -> Optional[Dict[str, str]]: + """ + The properties of the catalog. Note, this method will return null if the properties are not set. + + Returns: + The properties of the catalog. + """ + pass + + def as_schemas(self) -> SupportsSchemas: + """Return the {@link SupportsSchemas} if the catalog supports schema operations. + + Raises: + UnsupportedOperationException if the catalog does not support schema operations. + + Returns: + The {@link SupportsSchemas} if the catalog supports schema operations. + """ + raise UnsupportedOperationException("Catalog does not support schema operations") + + def as_table_catalog(self) -> 'TableCatalog': + """ + Raises: + UnsupportedOperationException if the catalog does not support table operations. + + Returns: + the {@link TableCatalog} if the catalog supports table operations. + """ + raise UnsupportedOperationException("Catalog does not support table operations") + + def as_fileset_catalog(self) -> 'FilesetCatalog': + """ + Raises: + UnsupportedOperationException if the catalog does not support fileset operations. + + Returns: + the FilesetCatalog if the catalog supports fileset operations. + """ + raise UnsupportedOperationException("Catalog does not support fileset operations") + + def as_topic_catalog(self) -> 'TopicCatalog': + """ + Returns: + the {@link TopicCatalog} if the catalog supports topic operations. + + Raises: + UnsupportedOperationException if the catalog does not support topic operations. + """ + raise UnsupportedOperationException("Catalog does not support topic operations") + + +class UnsupportedOperationException(Exception): + pass \ No newline at end of file diff --git a/clients/client-python/gravitino/api/catalog_change.py b/clients/client-python/gravitino/api/catalog_change.py new file mode 100644 index 00000000000..f971a9b159d --- /dev/null +++ b/clients/client-python/gravitino/api/catalog_change.py @@ -0,0 +1,254 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import ABC + + +class CatalogChange(ABC): + """ + A catalog change is a change to a catalog. It can be used to rename a catalog, update the comment + of a catalog, set a property and value pair for a catalog, or remove a property from a catalog. + """ + + @staticmethod + def rename(new_name): + """Creates a new catalog change to rename the catalog. + + Args: + new_name: The new name of the catalog. + + Returns: + The catalog change. + """ + return CatalogChange.RenameCatalog(new_name) + + @staticmethod + def update_comment(new_comment): + """Creates a new catalog change to update the catalog comment. + + Args: + new_comment: The new comment for the catalog. + + Returns: + The catalog change. + """ + return CatalogChange.UpdateCatalogComment(new_comment) + + @staticmethod + def set_property(property, value): + """Creates a new catalog change to set the property and value for the catalog. + + Args: + property: The property name to set. + value: The value to set the property to. + + Returns: + The catalog change. + """ + return CatalogChange.SetProperty(property, value) + + @staticmethod + def remove_property(property): + """Creates a new catalog change to remove a property from the catalog. + + Args: + property: The property name to remove. + + Returns: + The catalog change. + """ + return CatalogChange.RemoveProperty(property) + + class RenameCatalog: + """A catalog change to rename the catalog.""" + + def __init__(self, new_name): + self.new_name = new_name + + def get_new_name(self): + """Retrieves the new name set for the catalog. + + Returns: + The new name of the catalog. + """ + return self.new_name + + def __eq__(self, other): + """Compares this RenameCatalog instance with another object for equality. Two instances are + considered equal if they designate the same new name for the catalog. + + Args: + other: The object to compare with this instance. + + Returns: + true if the given object represents an identical catalog renaming operation; false otherwise. + """ + if not isinstance(other, CatalogChange.RenameCatalog): + return False + return self.new_name == other.new_name + + def __hash__(self): + """Generates a hash code for this RenameCatalog instance. The hash code is primarily based on + the new name for the catalog. + + Returns: + A hash code value for this renaming operation. + """ + return hash(self.new_name) + + def __str__(self): + """Provides a string representation of the RenameCatalog instance. This string includes the + class name followed by the new name of the catalog. + + Returns: + A string summary of this renaming operation. + """ + return f"RENAMECATALOG {self.new_name}" + + class UpdateCatalogComment: + """A catalog change to update the catalog comment.""" + + def __init__(self, new_comment): + self.new_comment = new_comment + + def get_new_comment(self): + """Retrieves the new comment intended for the catalog. + + Returns: + The new comment that has been set for the catalog. + """ + return self.new_comment + + def __eq__(self, other): + """Compares this UpdateCatalogComment instance with another object for equality. + Two instances are considered equal if they designate the same new comment for the catalog. + + Args: + other: The object to compare with this instance. + + Returns: + true if the given object represents the same comment update; false otherwise. + """ + if not isinstance(other, CatalogChange.UpdateCatalogComment): + return False + return self.new_comment == other.new_comment + + def __hash__(self): + """Generates a hash code for this UpdateCatalogComment instance. + The hash code is based on the new comment for the catalog. + + Returns: + A hash code representing this comment update operation. + """ + return hash(self.new_comment) + + def __str__(self): + """Provides a string representation of the UpdateCatalogComment instance. + This string format includes the class name followed by the new comment for the catalog. + + Returns: + A string summary of this comment update operation. + """ + return f"UPDATECATALOGCOMMENT {self.new_comment}" + + class SetProperty: + """A catalog change to set the property and value for the catalog.""" + + def __init__(self, property, value): + self.property = property + self.value = value + + def get_property(self): + """Retrieves the name of the property being set in the catalog. + + Returns: + The name of the property. + """ + return self.property + + def get_value(self): + """Retrieves the value assigned to the property in the catalog. + + Returns: + The value of the property. + """ + return self.value + + def __eq__(self, other): + """Compares this SetProperty instance with another object for equality. + Two instances are considered equal if they have the same property and value for the catalog. + + Args: + other: The object to compare with this instance. + + Returns: + true if the given object represents the same property setting; false otherwise. + """ + if not isinstance(other, CatalogChange.SetProperty): + return False + return self.property == other.property and self.value == other.value + + def __hash__(self): + """Generates a hash code for this SetProperty instance. + The hash code is based on both the property name and its assigned value. + + Returns: + A hash code value for this property setting. + """ + return hash((self.property, self.value)) + + def __str__(self): + """Provides a string representation of the SetProperty instance. + This string format includes the class name followed by the property and its value. + + Returns: + A string summary of the property setting. + """ + return f"SETPROPERTY {self.property} {self.value}" + + class RemoveProperty: + """A catalog change to remove a property from the catalog.""" + + def __init__(self, property): + self.property = property + + def get_property(self): + """Retrieves the name of the property to be removed from the catalog. + + Returns: + The name of the property for removal. + """ + return self.property + + def __eq__(self, other): + """Compares this RemoveProperty instance with another object for equality. + Two instances are considered equal if they target the same property for removal from the catalog. + + Args: + other The object to compare with this instance. + + Returns: + true if the given object represents the same property removal; false otherwise. + """ + if not isinstance(other, CatalogChange.RemoveProperty): + return False + return self.property == other.property + + def __hash__(self): + """Generates a hash code for this RemoveProperty instance. + The hash code is based on the property name that is to be removed from the catalog. + + Returns: + A hash code value for this property removal operation. + """ + return hash(self.property) + + def __str__(self): + """Provides a string representation of the RemoveProperty instance. + This string format includes the class name followed by the property name to be removed. + + Returns: + A string summary of the property removal operation. + """ + return f"REMOVEPROPERTY {self.property}" diff --git a/clients/client-python/gravitino/api/fileset.py b/clients/client-python/gravitino/api/fileset.py new file mode 100644 index 00000000000..213257931af --- /dev/null +++ b/clients/client-python/gravitino/api/fileset.py @@ -0,0 +1,98 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import abstractmethod +from enum import Enum +from typing import Optional, Dict + +from gravitino.api.auditable import Auditable + + +class Fileset(Auditable): + """An interface representing a fileset under a schema {@link Namespace}. A fileset is a virtual + concept of the file or directory that is managed by Gravitino. Users can create a fileset object + to manage the non-tabular data on the FS-like storage. The typical use case is to manage the + training data for AI workloads. The major difference compare to the relational table is that the + fileset is schema-free, the main property of the fileset is the storage location of the + underlying data. + + Fileset defines the basic properties of a fileset object. A catalog implementation + with FilesetCatalog should implement this interface. + """ + class Type(Enum): + """An enum representing the type of the fileset object.""" + + MANAGED = "managed" + """Fileset is managed by Gravitino. + When specified, the data will be deleted when the fileset object is deleted""" + + EXTERNAL = "external" + """Fileset is not managed by Gravitino. + When specified, the data will not be deleted when the fileset object is deleted""" + + @abstractmethod + def name(self) -> str: + """ + Returns: + Name of the fileset object. + """ + pass + + @abstractmethod + def comment(self) -> Optional[str]: + """ + Returns: + The comment of the fileset object. Null is returned if no comment is set. + """ + pass + + @abstractmethod + def type(self) -> Type: + """ + @Returns: + The type of the fileset object. + """ + pass + + @abstractmethod + def storage_location(self) -> str: + """Get the storage location of the file or directory path that is managed by this fileset object. + + The returned storageLocation can either be the one specified when creating the fileset + object, or the one specified in the catalog / schema level if the fileset object is created + under this catalog / schema. + + For managed fileset, the storageLocation can be: + + 1) The one specified when creating the fileset object. + + 2) When catalog property "location" is specified but schema property "location" is not + specified, then the storageLocation will be "{catalog location}/schemaName/filesetName". + + 3) When catalog property "location" is not specified but schema property "location" is + specified, then the storageLocation will be "{schema location}/filesetName". + + 4) When both catalog property "location" and schema property "location" are specified, then + the storageLocation will be "{schema location}/filesetName". + + 5) When both catalog property "location" and schema property "location" are not specified, + and storageLocation specified when creating the fileset object is null, this situation is + illegal. + + For external fileset, the storageLocation can be: + + 1) The one specified when creating the fileset object. + + Returns: + The storage location of the fileset object. + """ + pass + + @abstractmethod + def properties(self) -> Dict[str, str]: + """ + Returns: + The properties of the fileset object. Empty map is returned if no properties are set. + """ + pass diff --git a/clients/client-python/gravitino/api/fileset_change.py b/clients/client-python/gravitino/api/fileset_change.py new file mode 100644 index 00000000000..82604ed7ce7 --- /dev/null +++ b/clients/client-python/gravitino/api/fileset_change.py @@ -0,0 +1,253 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import ABC + + +class FilesetChange(ABC): + """A fileset change is a change to a fileset. It can be used to rename a fileset, update the comment + of a fileset, set a property and value pair for a fileset, or remove a property from a fileset. + """ + + @staticmethod + def rename(new_name): + """Creates a new fileset change to rename the fileset. + + Args: + new_name: The new name of the fileset. + + Returns: + The fileset change. + """ + return FilesetChange.RenameFileset(new_name) + + @staticmethod + def update_comment(new_comment): + """Creates a new fileset change to update the fileset comment. + + Args: + new_comment: The new comment for the fileset. + + Returns: + The fileset change. + """ + return FilesetChange.UpdateFilesetComment(new_comment) + + @staticmethod + def set_property(property, value): + """Creates a new fileset change to set the property and value for the fileset. + + Args: + property: The property name to set. + value: The value to set the property to. + + Returns: + The fileset change. + """ + return FilesetChange.SetProperty(property, value) + + @staticmethod + def remove_property(property): + """Creates a new fileset change to remove a property from the fileset. + + Args: + property: The property name to remove. + + Returns: + The fileset change. + """ + return FilesetChange.RemoveProperty(property) + + class RenameFileset: + """A fileset change to rename the fileset.""" + + def __init__(self, new_name): + self.new_name = new_name + + def get_new_name(self): + """Retrieves the new name set for the fileset. + + Returns: + The new name of the fileset. + """ + return self.new_name + + def __eq__(self, other): + """Compares this RenameFileset instance with another object for equality. + Two instances are considered equal if they designate the same new name for the fileset. + + Args: + other: The object to compare with this instance. + + Returns: + true if the given object represents an identical fileset renaming operation; false otherwise. + """ + if not isinstance(other, FilesetChange.RenameFileset): + return False + return self.new_name == other.new_name + + def __hash__(self): + """Generates a hash code for this RenameFileset instance. + The hash code is primarily based on the new name for the fileset. + + Returns: + A hash code value for this renaming operation. + """ + return hash(self.new_name) + + def __str__(self): + """Provides a string representation of the RenameFile instance. + This string includes the class name followed by the new name of the fileset. + + Returns: + A string summary of this renaming operation. + """ + return f"RENAMEFILESET {self.new_name}" + + class UpdateFilesetComment: + """A fileset change to update the fileset comment.""" + + def __init__(self, new_comment): + self.new_comment = new_comment + + def get_new_comment(self): + """Retrieves the new comment intended for the fileset. + + Returns: + The new comment that has been set for the fileset. + """ + return self.new_comment + + def __eq__(self, other): + """Compares this UpdateFilesetComment instance with another object for equality. + Two instances are considered equal if they designate the same new comment for the fileset. + + Args: + other: The object to compare with this instance. + + Returns: + true if the given object represents the same comment update; false otherwise. + """ + if not isinstance(other, FilesetChange.UpdateFilesetComment): + return False + return self.new_comment == other.new_comment + + def __hash__(self): + """Generates a hash code for this UpdateFileComment instance. + The hash code is based on the new comment for the fileset. + + Returns: + A hash code representing this comment update operation. + """ + return hash(self.new_comment) + + def __str__(self): + """Provides a string representation of the UpdateFilesetComment instance. + This string format includes the class name followed by the new comment for the fileset. + + Returns: + A string summary of this comment update operation. + """ + return f"UPDATEFILESETCOMMENT {self.new_comment}" + + class SetProperty: + """A fileset change to set the property and value for the fileset.""" + + def __init__(self, property, value): + self.property = property + self.value = value + + def get_property(self): + """Retrieves the name of the property being set in the fileset. + + Returns: + The name of the property. + """ + return self.property + + def get_value(self): + """Retrieves the value assigned to the property in the fileset. + + Returns: + The value of the property. + """ + return self.value + + def __eq__(self, other): + """Compares this SetProperty instance with another object for equality. + Two instances are considered equal if they have the same property and value for the fileset. + + Args: + other: The object to compare with this instance. + + Returns: + true if the given object represents the same property setting; false otherwise. + """ + if not isinstance(other, FilesetChange.SetProperty): + return False + return self.property == other.property and self.value == other.value + + def __hash__(self): + """Generates a hash code for this SetProperty instance. + The hash code is based on both the property name and its assigned value. + + Returns: + A hash code value for this property setting. + """ + return hash((self.property, self.value)) + + def __str__(self): + """Provides a string representation of the SetProperty instance. + This string format includes the class name followed by the property and its value. + + Returns: + A string summary of the property setting. + """ + return f"SETPROPERTY {self.property} {self.value}" + + class RemoveProperty: + """A fileset change to remove a property from the fileset.""" + + def __init__(self, property): + self.property = property + + def get_property(self): + """Retrieves the name of the property to be removed from the fileset. + + Returns: + The name of the property for removal. + """ + return self.property + + def __eq__(self, other): + """Compares this RemoveProperty instance with another object for equality. + Two instances are considered equal if they target the same property for removal from the fileset. + + Args: + other: The object to compare with this instance. + + Returns: + true if the given object represents the same property removal; false otherwise. + """ + if not isinstance(other, FilesetChange.RemoveProperty): + return False + return self.property == other.property + + def __hash__(self): + """Generates a hash code for this RemoveProperty instance. + The hash code is based on the property name that is to be removed from the fileset. + + Returns: + A hash code value for this property removal operation. + """ + return hash(self.property) + + def __str__(self): + """Provides a string representation of the RemoveProperty instance. + This string format includes the class name followed by the property name to be removed. + + Returns: + A string summary of the property removal operation. + """ + return f"REMOVEPROPERTY {self.property}" diff --git a/clients/client-python/gravitino/meta_change.py b/clients/client-python/gravitino/api/metalake_change.py similarity index 98% rename from clients/client-python/gravitino/meta_change.py rename to clients/client-python/gravitino/api/metalake_change.py index 809c87427fc..db3ad87076e 100644 --- a/clients/client-python/gravitino/meta_change.py +++ b/clients/client-python/gravitino/api/metalake_change.py @@ -30,7 +30,7 @@ def update_comment(new_comment: str) -> 'MetalakeChange.UpdateMetalakeComment': Args: new_comment: The new comment of the metalake. - Return: + Returns: The metalake change. """ return MetalakeChange.UpdateMetalakeComment(new_comment) @@ -43,7 +43,7 @@ def set_property(property: str, value: str) -> 'SetProperty': property: The property name to set. value: The value to set the property to. - Return: + Returns: The metalake change. """ return MetalakeChange.SetProperty(property, value) @@ -55,7 +55,7 @@ def remove_property(property: str) -> 'RemoveProperty': Args: property: The property name to remove. - Return: + Returns: The metalake change. """ return MetalakeChange.RemoveProperty(property) diff --git a/clients/client-python/gravitino/api/schema.py b/clients/client-python/gravitino/api/schema.py new file mode 100644 index 00000000000..7773835b2c7 --- /dev/null +++ b/clients/client-python/gravitino/api/schema.py @@ -0,0 +1,32 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import ABC, abstractmethod +from typing import Optional, Dict + +from gravitino.api.auditable import Auditable + + +class Schema(Auditable, ABC): + """ + An interface representing a schema in the Catalog. A Schema is a + basic container of relational objects, like tables, views, etc. A Schema can be self-nested, + which means it can be schema1.schema2.table. + + This defines the basic properties of a schema. A catalog implementation with SupportsSchemas + should implement this interface. + """ + + @abstractmethod + def name(self) -> str: + """Returns the name of the Schema.""" + pass + + def comment(self) -> Optional[str]: + """Returns the comment of the Schema. None is returned if the comment is not set.""" + return None + + def properties(self) -> Dict[str, str]: + """Returns the properties of the Schema. An empty dictionary is returned if no properties are set.""" + return {} diff --git a/clients/client-python/gravitino/api/schema_change.py b/clients/client-python/gravitino/api/schema_change.py new file mode 100644 index 00000000000..f7cb296e5ae --- /dev/null +++ b/clients/client-python/gravitino/api/schema_change.py @@ -0,0 +1,133 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import ABC + + +class SchemaChange(ABC): + """NamespaceChange class to set the property and value pairs for the namespace.""" + + @staticmethod + def set_property(property, value): + """SchemaChange class to set the property and value pairs for the schema. + + Args: + property: The property name to set. + value: The value to set the property to. + + Returns: + The SchemaChange object. + """ + return SchemaChange.SetProperty(property, value) + + @staticmethod + def remove_property(property): + """SchemaChange class to remove a property from the schema. + + Args: + property: The property name to remove. + + Returns: + The SchemaChange object. + """ + return SchemaChange.RemoveProperty(property) + + class SetProperty: + """SchemaChange class to set the property and value pairs for the schema.""" + def __init__(self, property, value): + self.property = property + self.value = value + + def get_property(self): + """Retrieves the name of the property to be set. + + Returns: + The name of the property. + """ + return self.property + + def get_value(self): + """Retrieves the value of the property to be set. + + Returns: + The value of the property. + """ + return self.value + + def __eq__(self, other): + """Compares this SetProperty instance with another object for equality. + Two instances are considered equal if they have the same property and value. + + Args: + other: The object to compare with this instance. + + Returns: + true if the given object represents the same property setting; false otherwise. + """ + if not isinstance(other, SchemaChange.SetProperty): + return False + return self.property == other.property and self.value == other.value + + def __hash__(self): + """Generates a hash code for this SetProperty instance. + The hash code is based on both the property name and its value. + + Returns: + A hash code value for this property setting. + """ + return hash((self.property, self.value)) + + def __str__(self): + """Provides a string representation of the SetProperty instance. + This string format includes the class name followed by the property name and its value. + + Returns: + A string summary of the property setting. + """ + return f"SETPROPERTY {self.property} {self.value}" + + class RemoveProperty: + """SchemaChange class to remove a property from the schema.""" + def __init__(self, property): + self.property = property + + def get_property(self): + """Retrieves the name of the property to be removed. + + Returns: + The name of the property for removal. + """ + return self.property + + def __eq__(self, other): + """Compares this RemoveProperty instance with another object for equality. + Two instances are considered equal if they target the same property for removal. + + Args: + other: The object to compare with this instance. + + Returns: + true if the given object represents the same property removal; false otherwise. + """ + if not isinstance(other, SchemaChange.RemoveProperty): + return False + return self.property == other.property + + def __hash__(self): + """Generates a hash code for this RemoveProperty instance. + This hash code is based on the property name that is to be removed. + + Returns: + A hash code value for this property removal operation. + """ + return hash(self.property) + + def __str__(self): + """Provides a string representation of the RemoveProperty instance. + This string format includes the class name followed by the property name to be removed. + + Returns: + A string summary of the property removal operation. + """ + return f"REMOVEPROPERTY {self.property}" diff --git a/clients/client-python/gravitino/api/supports_schemas.py b/clients/client-python/gravitino/api/supports_schemas.py new file mode 100644 index 00000000000..6f8b1ecc9a1 --- /dev/null +++ b/clients/client-python/gravitino/api/supports_schemas.py @@ -0,0 +1,126 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import ABC, abstractmethod +from typing import List, Dict, Optional + +from gravitino.api.schema import Schema +from gravitino.api.schema_change import SchemaChange +from gravitino.name_identifier import NameIdentifier +from gravitino.namespace import Namespace + + +class NoSuchSchemaException(Exception): + """Exception raised if the schema does not exist.""" + pass + + +class SupportsSchemas(ABC): + """ + The Catalog interface to support schema operations. If the implemented catalog has schema + semantics, it should implement this interface. + """ + + @abstractmethod + def list_schemas(self, namespace: Namespace) -> List[NameIdentifier]: + """List schemas under a namespace. + + If an entity such as a table, view exists, its parent schemas must also exist and must be + returned by this discovery method. For example, if table a.b.t exists, this method invoked as + list_schemas(a) must return [a.b] in the result array. + + Args: + namespace: The namespace to list. + + Raises: + NoSuchCatalogException: If the catalog does not exist. + + Returns: + A list of schema identifiers under the namespace. + """ + pass + + def schema_exists(self, ident: NameIdentifier) -> bool: + """Check if a schema exists. + + If an entity such as a table, view exists, its parent namespaces must also exist. For + example, if table a.b.t exists, this method invoked as schema_exists(a.b) must return true. + + Args: + ident: The name identifier of the schema. + + Returns: + True if the schema exists, false otherwise. + """ + try: + self.load_schema(ident) + return True + except NoSuchSchemaException: + return False + + @abstractmethod + def create_schema(self, ident: NameIdentifier, comment: Optional[str], properties: Dict[str, str]) -> Schema: + """Create a schema in the catalog. + + Args: + ident: The name identifier of the schema. + comment: The comment of the schema. + properties: The properties of the schema. + + Raises: + NoSuchCatalogException: If the catalog does not exist. + SchemaAlreadyExistsException: If the schema already exists. + + Returns: + The created schema. + """ + pass + + @abstractmethod + def load_schema(self, ident: NameIdentifier) -> Schema: + """Load metadata properties for a schema. + + Args: + ident: The name identifier of the schema. + + Raises: + NoSuchSchemaException: If the schema does not exist (optional). + + Returns: + A schema. + """ + pass + + @abstractmethod + def alter_schema(self, ident: NameIdentifier, changes: List[SchemaChange]) -> Schema: + """Apply the metadata change to a schema in the catalog. + + Args: + ident: The name identifier of the schema. + changes: The metadata changes to apply. + + Raises: + NoSuchSchemaException: If the schema does not exist. + + Returns: + The altered schema. + """ + pass + + @abstractmethod + def drop_schema(self, ident: NameIdentifier, cascade: bool) -> bool: + """Drop a schema from the catalog. If cascade option is true, recursively + drop all objects within the schema. + + Args: + ident: The name identifier of the schema. + cascade: If true, recursively drop all objects within the schema. + + Returns: + True if the schema exists and is dropped successfully, false otherwise. + + Raises: + NonEmptySchemaException: If the schema is not empty and cascade is false. + """ + pass diff --git a/clients/client-python/gravitino/catalog/base_schema_catalog.py b/clients/client-python/gravitino/catalog/base_schema_catalog.py new file mode 100644 index 00000000000..d002c902811 --- /dev/null +++ b/clients/client-python/gravitino/catalog/base_schema_catalog.py @@ -0,0 +1,165 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +import logging +from typing import Dict + +from gravitino.api.catalog import Catalog +from gravitino.api.schema_change import SchemaChange +from gravitino.api.supports_schemas import SupportsSchemas +from gravitino.dto.audit_dto import AuditDTO +from gravitino.dto.catalog_dto import CatalogDTO +from gravitino.dto.requests.schema_create_request import SchemaCreateRequest +from gravitino.dto.requests.schema_update_request import SchemaUpdateRequest +from gravitino.dto.requests.schema_updates_request import SchemaUpdatesRequest +from gravitino.dto.responses.drop_response import DropResponse +from gravitino.dto.responses.entity_list_response import EntityListResponse +from gravitino.dto.responses.schema_response import SchemaResponse +from gravitino.name_identifier import NameIdentifier +from gravitino.namespace import Namespace +from gravitino.utils import HTTPClient + +logger = logging.getLogger(__name__) + + +class BaseSchemaCatalog(CatalogDTO, SupportsSchemas): + """ + BaseSchemaCatalog is the base abstract class for all the catalog with schema. It provides the + common methods for managing schemas in a catalog. With BaseSchemaCatalog, users can list, + create, load, alter and drop a schema with specified identifier. + """ + + rest_client: HTTPClient + """The REST client to send the requests.""" + + def __init__(self, name: str = None, type: Catalog.Type = Catalog.Type.UNSUPPORTED, provider: str = None, + comment: str = None, properties: Dict[str, str] = None, audit: AuditDTO = None, + rest_client: HTTPClient = None): + super().__init__(_name=name, _type=type, _provider=provider, _comment=comment, _properties=properties, _audit=audit) + self.rest_client = rest_client + + def as_schemas(self): + return self + + def list_schemas(self, namespace: Namespace) -> [NameIdentifier]: + """List all the schemas under the given catalog namespace. + + Args: + namespace: The namespace of the catalog. + + Raises: + NoSuchCatalogException if the catalog with specified namespace does not exist. + + Returns: + A list of {@link NameIdentifier} of the schemas under the given catalog namespace. + """ + Namespace.check_schema(namespace) + resp = self.rest_client.get(BaseSchemaCatalog.format_schema_request_path(namespace)) + entity_list_response = EntityListResponse.from_dict(resp.json()) + entity_list_response.validate() + return entity_list_response.idents + + def create_schema(self, ident: NameIdentifier = None, comment: str = None, properties: Dict[str, str] = None): + """Create a new schema with specified identifier, comment and metadata. + + Args: + ident: The name identifier of the schema. + comment: The comment of the schema. + properties: The properties of the schema. + + Raises: + NoSuchCatalogException if the catalog with specified namespace does not exist. + SchemaAlreadyExistsException if the schema with specified identifier already exists. + + Returns: + The created Schema. + """ + NameIdentifier.check_schema(ident) + req = SchemaCreateRequest(ident.name(), comment, properties) + req.validate() + + resp = self.rest_client.post(BaseSchemaCatalog.format_schema_request_path(ident.namespace()), json=req) + schema_response = SchemaResponse.from_json(resp.body, infer_missing=True) + schema_response.validate() + + return schema_response.schema + + def load_schema(self, ident): + """Load the schema with specified identifier. + + Args: + ident: The name identifier of the schema. + + Raises: + NoSuchSchemaException if the schema with specified identifier does not exist. + + Returns: + The Schema with specified identifier. + """ + NameIdentifier.check_schema(ident) + resp = self.rest_client.get(BaseSchemaCatalog.format_schema_request_path(ident.namespace()) + "/" + ident.name()) + schema_response = SchemaResponse.from_json(resp.body, infer_missing=True) + schema_response.validate() + + return schema_response.schema + + def alter_schema(self, ident, *changes): + """Alter the schema with specified identifier by applying the changes. + + Args: + ident: The name identifier of the schema. + changes: The metadata changes to apply. + + Raises: + NoSuchSchemaException if the schema with specified identifier does not exist. + + Returns: + The altered Schema. + """ + NameIdentifier.check_schema(ident) + reqs = [BaseSchemaCatalog.to_schema_update_request(change) for change in changes] + updatesRequest = SchemaUpdatesRequest(reqs) + updatesRequest.validate() + resp = self.rest_client.put(BaseSchemaCatalog.format_schema_request_path(ident.namespace()) + "/" + ident.name()) + schema_response = SchemaResponse.from_json(resp.body, infer_missing=True) + schema_response.validate() + return schema_response.schema + + def drop_schema(self, ident, cascade: bool): + """Drop the schema with specified identifier. + + Args: + ident: The name identifier of the schema. + cascade: Whether to drop all the tables under the schema. + + Raises: + NonEmptySchemaException if the schema is not empty and cascade is false. + + Returns: + true if the schema is dropped successfully, false otherwise. + """ + NameIdentifier.check_schema(ident) + try: + params = {"cascade": str(cascade)} + resp = self.rest_client.delete( + BaseSchemaCatalog.format_schema_request_path(ident.namespace()) + "/" + ident.name(), params=params) + drop_resp = DropResponse.from_json(resp.body, infer_missing=True) + drop_resp.validate() + return drop_resp.dropped() + except Exception as e: + logger.warning("Failed to drop schema {}", ident, e) + return False + + @staticmethod + def format_schema_request_path(ns: Namespace): + return "api/metalakes/" + ns.level(0) + "/catalogs/" + ns.level(1) + "/schemas" + + @staticmethod + def to_schema_update_request(change: SchemaChange): + if isinstance(change, SchemaChange.SetProperty): + return SchemaUpdateRequest.SetSchemaPropertyRequest(change.property, change.value) + elif isinstance(change, SchemaChange.RemoveProperty): + return SchemaUpdateRequest.RemoveSchemaPropertyRequest(change.property) + else: + raise ValueError(f"Unknown change type: {type(change).__name__}") diff --git a/clients/client-python/gravitino/catalog/fileset_catalog.py b/clients/client-python/gravitino/catalog/fileset_catalog.py new file mode 100644 index 00000000000..c7f2c730e34 --- /dev/null +++ b/clients/client-python/gravitino/catalog/fileset_catalog.py @@ -0,0 +1,184 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +import logging +from typing import List, Dict + +from gravitino.api.catalog import Catalog +from gravitino.api.fileset import Fileset +from gravitino.api.fileset_change import FilesetChange +from gravitino.catalog.base_schema_catalog import BaseSchemaCatalog +from gravitino.dto.audit_dto import AuditDTO +from gravitino.dto.requests.fileset_create_request import FilesetCreateRequest +from gravitino.dto.requests.fileset_update_request import FilesetUpdateRequest +from gravitino.dto.requests.fileset_updates_request import FilesetUpdatesRequest +from gravitino.dto.responses.drop_response import DropResponse +from gravitino.dto.responses.entity_list_response import EntityListResponse +from gravitino.dto.responses.fileset_response import FilesetResponse +from gravitino.name_identifier import NameIdentifier +from gravitino.namespace import Namespace +from gravitino.utils import HTTPClient + +logger = logging.getLogger(__name__) + + +class FilesetCatalog(BaseSchemaCatalog): + """Fileset catalog is a catalog implementation that supports fileset like metadata operations, for + example, schemas and filesets list, creation, update and deletion. A Fileset catalog is under the metalake. + """ + + def __init__(self, name: str = None, type: Catalog.Type = Catalog.Type.UNSUPPORTED, + provider: str = None, comment: str = None, properties: Dict[str, str] = None, + audit: AuditDTO = None, rest_client: HTTPClient = None): + + super().__init__(name, type, provider, comment, properties, audit, rest_client) + + def as_fileset_catalog(self): + return self + + def list_filesets(self, namespace: Namespace) -> List[NameIdentifier]: + """List the filesets in a schema namespace from the catalog. + + Args: + namespace A schema namespace. + + Raises: + NoSuchSchemaException If the schema does not exist. + + Returns: + An array of fileset identifiers in the namespace. + """ + Namespace.check_fileset(namespace) + + resp = self.rest_client.get( + self.format_fileset_request_path(namespace) + ) + entity_list_resp = EntityListResponse.from_json(resp.body, infer_missing=True) + entity_list_resp.validate() + + return entity_list_resp.idents + + def load_fileset(self, ident) -> Fileset: + """Load fileset metadata by {@link NameIdentifier} from the catalog. + + Args: + ident: A fileset identifier. + + Raises: + NoSuchFilesetException If the fileset does not exist. + + Returns: + The fileset metadata. + """ + NameIdentifier.check_fileset(ident) + + resp = self.rest_client.get(f"{self.format_fileset_request_path(ident.namespace())}/{ident.name()}") + fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True) + fileset_resp.validate() + + return fileset_resp.fileset + + def create_fileset(self, ident: NameIdentifier, comment: str, type: Catalog.Type, + storage_location: str, properties: Dict[str, str]) -> Fileset: + """Create a fileset metadata in the catalog. + + If the type of the fileset object is "MANAGED", the underlying storageLocation can be null, + and Gravitino will manage the storage location based on the location of the schema. + + If the type of the fileset object is "EXTERNAL", the underlying storageLocation must be set. + + Args: + ident: A fileset identifier. + comment: The comment of the fileset. + type: The type of the fileset. + storage_location: The storage location of the fileset. + properties: The properties of the fileset. + + Raises: + NoSuchSchemaException If the schema does not exist. + FilesetAlreadyExistsException If the fileset already exists. + + Returns: + The created fileset metadata + """ + NameIdentifier.check_fileset(ident) + + req = FilesetCreateRequest(name=ident.name(), comment=comment, type=type, + storage_location=storage_location, properties=properties) + + resp = self.rest_client.post(self.format_fileset_request_path(ident.namespace()), req) + fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True) + fileset_resp.validate() + + return fileset_resp.fileset + + def alter_fileset(self, ident, *changes) -> Fileset: + """Update a fileset metadata in the catalog. + + Args: + ident: A fileset identifier. + changes: The changes to apply to the fileset. + + Args: + IllegalArgumentException If the changes are invalid. + NoSuchFilesetException If the fileset does not exist. + + Returns: + The updated fileset metadata. + """ + NameIdentifier.check_fileset(ident) + + updates = [FilesetCatalog.to_fileset_update_request(change) for change in changes] + req = FilesetUpdatesRequest(updates) + req.validate() + + resp = self.rest_client.put(f"{self.format_fileset_request_path(ident.namespace())}/{ident.name()}", req) + fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True) + fileset_resp.validate() + + return fileset_resp.fileset + + def drop_fileset(self, ident: NameIdentifier) -> bool: + """Drop a fileset from the catalog. + + The underlying files will be deleted if this fileset type is managed, otherwise, only the + metadata will be dropped. + + Args: + ident: A fileset identifier. + + Returns: + true If the fileset is dropped, false the fileset did not exist. + """ + try: + NameIdentifier.check_fileset(ident) + + resp = self.rest_client.delete( + f"{self.format_fileset_request_path(ident.namespace())}/{ident.name()}", + ) + drop_resp = DropResponse.from_json(resp.body, infer_missing=True) + drop_resp.validate() + + return drop_resp.dropped() + except Exception as e: + logger.warning(f"Failed to drop fileset {ident}: {e}") + return False + + @staticmethod + def format_fileset_request_path(namespace: Namespace) -> str: + schema_ns = Namespace.of(namespace.level(0), namespace.level(1)) + return f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{namespace.level(2)}/filesets" + + @staticmethod + def to_fileset_update_request(change: FilesetChange): + if isinstance(change, FilesetChange.RenameFileset): + return FilesetUpdateRequest.RenameFilesetRequest(change.new_name) + elif isinstance(change, FilesetChange.UpdateFilesetComment): + return FilesetUpdateRequest.UpdateFilesetCommentRequest(change.new_comment) + elif isinstance(change, FilesetChange.SetProperty): + return FilesetUpdateRequest.SetFilesetPropertyRequest(change.property, change.value) + elif isinstance(change, FilesetChange.RemoveProperty): + return FilesetUpdateRequest.RemoveFilesetPropertyRequest(change.property) + else: + raise ValueError(f"Unknown change type: {type(change).__name__}") diff --git a/clients/client-python/gravitino/client/gravitino_admin_client.py b/clients/client-python/gravitino/client/gravitino_admin_client.py index ba54f32bc34..4c5853209d6 100644 --- a/clients/client-python/gravitino/client/gravitino_admin_client.py +++ b/clients/client-python/gravitino/client/gravitino_admin_client.py @@ -13,7 +13,7 @@ from gravitino.dto.responses.drop_response import DropResponse from gravitino.dto.responses.metalake_list_response import MetalakeListResponse from gravitino.dto.responses.metalake_response import MetalakeResponse -from gravitino.meta_change import MetalakeChange +from gravitino.api.metalake_change import MetalakeChange from gravitino.name_identifier import NameIdentifier logger = logging.getLogger(__name__) @@ -29,9 +29,9 @@ def __init__(self, uri): # TODO: AuthDataProvider authDataProvider super().__init__(uri) def list_metalakes(self) -> List[GravitinoMetalake]: - """ - Retrieves a list of Metalakes from the Gravitino API. - Return: + """Retrieves a list of Metalakes from the Gravitino API. + + Returns: An array of GravitinoMetalake objects representing the Metalakes. """ resp = self.rest_client.get(self.API_METALAKES_LIST_PATH) @@ -41,19 +41,20 @@ def list_metalakes(self) -> List[GravitinoMetalake]: return [GravitinoMetalake.build(o, self.rest_client) for o in metalake_list_resp.metalakes] def create_metalake(self, ident: NameIdentifier, comment: str, properties: Dict[str, str]) -> GravitinoMetalake: - """ - Creates a new Metalake using the Gravitino API. + """Creates a new Metalake using the Gravitino API. + Args: ident: The identifier of the new Metalake. comment: The comment for the new Metalake. properties: The properties of the new Metalake. - Return: + + Returns: A GravitinoMetalake instance representing the newly created Metalake. - TODO: @throws MetalakeAlreadyExistsException If a Metalake with the specified identifier already exists. + TODO: @throws MetalakeAlreadyExistsException If a Metalake with the specified identifier already exists. """ NameIdentifier.check_metalake(ident) - req = MetalakeCreateRequest(ident.name, comment, properties) + req = MetalakeCreateRequest(ident.name(), comment, properties) req.validate() resp = self.rest_client.post(self.API_METALAKES_LIST_PATH, req) @@ -63,12 +64,13 @@ def create_metalake(self, ident: NameIdentifier, comment: str, properties: Dict[ return GravitinoMetalake.build(metalake_response.metalake, self.rest_client) def alter_metalake(self, ident: NameIdentifier, *changes: MetalakeChange) -> GravitinoMetalake: - """ - Alters a specific Metalake using the Gravitino API. + """Alters a specific Metalake using the Gravitino API. + Args: ident: The identifier of the Metalake to be altered. changes: The changes to be applied to the Metalake. - Return: + + Returns: A GravitinoMetalake instance representing the updated Metalake. TODO: @throws NoSuchMetalakeException If the specified Metalake does not exist. TODO: @throws IllegalArgumentException If the provided changes are invalid or not applicable. @@ -79,29 +81,28 @@ def alter_metalake(self, ident: NameIdentifier, *changes: MetalakeChange) -> Gra updates_request = MetalakeUpdatesRequest(reqs) updates_request.validate() - resp = self.rest_client.put(self.API_METALAKES_IDENTIFIER_PATH + ident.name, - updates_request) # , MetalakeResponse, {}, ErrorHandlers.metalake_error_handler()) - metalake_response = MetalakeResponse.from_json(resp.body) + resp = self.rest_client.put(self.API_METALAKES_IDENTIFIER_PATH + ident.name(), updates_request) + metalake_response = MetalakeResponse.from_json(resp.body, infer_missing=True) metalake_response.validate() return GravitinoMetalake.build(metalake_response.metalake, self.rest_client) def drop_metalake(self, ident: NameIdentifier) -> bool: - """ - Drops a specific Metalake using the Gravitino API. + """Drops a specific Metalake using the Gravitino API. + Args: ident: The identifier of the Metalake to be dropped. - Return: + + Returns: True if the Metalake was successfully dropped, false otherwise. """ NameIdentifier.check_metalake(ident) try: - resp = self.rest_client.delete(self.API_METALAKES_IDENTIFIER_PATH + ident.name) - dropResponse = DropResponse.from_json(resp.body) + resp = self.rest_client.delete(self.API_METALAKES_IDENTIFIER_PATH + ident.name()) + dropResponse = DropResponse.from_json(resp.body, infer_missing=True) return dropResponse.dropped() - except Exception as e: - logger.warning(f"Failed to drop metadata ", e) + logger.warning(f"Failed to drop metalake {ident.name()}", e) return False diff --git a/clients/client-python/gravitino/client/gravitino_client.py b/clients/client-python/gravitino/client/gravitino_client.py new file mode 100644 index 00000000000..11f764dc2a7 --- /dev/null +++ b/clients/client-python/gravitino/client/gravitino_client.py @@ -0,0 +1,76 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from typing import List, Dict + +from gravitino.api.catalog import Catalog +from gravitino.api.catalog_change import CatalogChange +from gravitino.client.gravitino_client_base import GravitinoClientBase +from gravitino.client.gravitino_metalake import GravitinoMetalake +from gravitino.name_identifier import NameIdentifier +from gravitino.namespace import Namespace + + +class NoSuchMetalakeException(Exception): + pass + + +class NoSuchCatalogException(Exception): + pass + + +class CatalogAlreadyExistsException(Exception): + pass + + +class GravitinoClient(GravitinoClientBase): + """Gravitino Client for an user to interact with the Gravitino API, allowing the client to list, + load, create, and alter Catalog. + + It uses an underlying {@link RESTClient} to send HTTP requests and receive responses from the API. + """ + metalake: GravitinoMetalake + + def __init__(self, uri: str, metalake_name: str): + """Constructs a new GravitinoClient with the given URI, authenticator and AuthDataProvider. + + Args: + uri: The base URI for the Gravitino API. + metalake_name: The specified metalake name. + TODO: authDataProvider: The provider of the data which is used for authentication. + + Raises: + NoSuchMetalakeException if the metalake with specified name does not exist. + """ + super().__init__(uri) + self.metalake = super().load_metalake(NameIdentifier.of(metalake_name)) + + def get_metalake(self) -> GravitinoMetalake: + """Get the current metalake object + + Raises: + NoSuchMetalakeException if the metalake with specified name does not exist. + + Returns: + the GravitinoMetalake object + """ + return self.metalake + + def list_catalogs(self, namespace: Namespace) -> List[NameIdentifier]: + return self.get_metalake().list_catalogs(namespace) + + def list_catalogs_info(self, namespace: Namespace) -> List[Catalog]: + return self.get_metalake().list_catalogs_info(namespace) + + def load_catalog(self, ident: NameIdentifier) -> Catalog: + return self.get_metalake().load_catalog(ident) + + def create_catalog(self, ident: NameIdentifier, type: Catalog.Type, provider: str, comment: str, properties: Dict[str, str]) -> Catalog: + return self.get_metalake().create_catalog(ident, type, provider, comment, properties) + + def alter_catalog(self, ident: NameIdentifier, *changes: CatalogChange): + return self.get_metalake().alter_catalog(ident, *changes) + + def drop_catalog(self, ident: NameIdentifier): + return self.get_metalake().drop_catalog(ident) diff --git a/clients/client-python/gravitino/client/gravitino_client_base.py b/clients/client-python/gravitino/client/gravitino_client_base.py index a1f640709c8..cba3ed6d643 100644 --- a/clients/client-python/gravitino/client/gravitino_client_base.py +++ b/clients/client-python/gravitino/client/gravitino_client_base.py @@ -8,18 +8,25 @@ from gravitino.client.gravitino_version import GravitinoVersion from gravitino.dto.responses.metalake_response import MetalakeResponse from gravitino.name_identifier import NameIdentifier -from gravitino.utils import HTTPClient +from gravitino.utils import HTTPClient, Response logger = logging.getLogger(__name__) + class GravitinoClientBase: """ Base class for Gravitino Java client; It uses an underlying {@link RESTClient} to send HTTP requests and receive responses from the API. """ - rest_client: HTTPClient # The REST client to communicate with the REST server - API_METALAKES_LIST_PATH = "api/metalakes" # The REST API path for listing metalakes - API_METALAKES_IDENTIFIER_PATH = f"{API_METALAKES_LIST_PATH}/" # The REST API path prefix for load a specific metalake + rest_client: HTTPClient + """The REST client to communicate with the REST server""" + + API_METALAKES_LIST_PATH = "api/metalakes" + """The REST API path for listing metalakes""" + + + API_METALAKES_IDENTIFIER_PATH = f"{API_METALAKES_LIST_PATH}/" + """The REST API path prefix for load a specific metalake""" def __init__(self, uri: str): self.rest_client = HTTPClient(uri) @@ -30,16 +37,17 @@ def load_metalake(self, ident: NameIdentifier) -> GravitinoMetalake: Args: ident The identifier of the Metalake to be loaded. - Return: + Returns: A GravitinoMetalake instance representing the loaded Metalake. Raises: NoSuchMetalakeException If the specified Metalake does not exist. """ + NameIdentifier.check_metalake(ident) - resp = self.rest_client.get(GravitinoClientBase.API_METALAKES_IDENTIFIER_PATH + ident.name) - metalake_response = MetalakeResponse.from_json(resp.body) + response = self.rest_client.get(GravitinoClientBase.API_METALAKES_IDENTIFIER_PATH + ident.name()) + metalake_response = MetalakeResponse.from_json(response.body, infer_missing=True) metalake_response.validate() return GravitinoMetalake.build(metalake_response.metalake, self.rest_client) @@ -47,7 +55,7 @@ def load_metalake(self, ident: NameIdentifier) -> GravitinoMetalake: def get_version(self) -> GravitinoVersion: """Retrieves the version of the Gravitino API. - Return: + Returns: A GravitinoVersion instance representing the version of the Gravitino API. """ resp = self.rest_client.get("api/version") diff --git a/clients/client-python/gravitino/client/gravitino_metalake.py b/clients/client-python/gravitino/client/gravitino_metalake.py index aef7e565d21..4d043863e7a 100644 --- a/clients/client-python/gravitino/client/gravitino_metalake.py +++ b/clients/client-python/gravitino/client/gravitino_metalake.py @@ -2,27 +2,200 @@ Copyright 2024 Datastrato Pvt Ltd. This software is licensed under the Apache License version 2. """ -from typing import Dict +import logging +from gravitino.api.catalog import Catalog +from gravitino.api.catalog_change import CatalogChange from gravitino.dto.audit_dto import AuditDTO +from gravitino.dto.dto_converters import DTOConverters from gravitino.dto.metalake_dto import MetalakeDTO +from gravitino.dto.requests.catalog_create_request import CatalogCreateRequest +from gravitino.dto.requests.catalog_updates_request import CatalogUpdatesRequest +from gravitino.dto.responses.catalog_list_response import CatalogListResponse +from gravitino.dto.responses.catalog_response import CatalogResponse +from gravitino.dto.responses.drop_response import DropResponse +from gravitino.dto.responses.entity_list_response import EntityListResponse +from gravitino.name_identifier import NameIdentifier +from gravitino.namespace import Namespace from gravitino.utils import HTTPClient +from typing import List, Dict + +logger = logging.getLogger(__name__) + + +class NoSuchMetalakeException(Exception): + pass + + +class NoSuchCatalogException(Exception): + pass + + +class CatalogAlreadyExistsException(Exception): + pass + + class GravitinoMetalake(MetalakeDTO): """ Gravitino Metalake is the top-level metadata repository for users. It contains a list of catalogs - as sub-level metadata collections. With {@link GravitinoMetalake}, users can list, create, load, + as sub-level metadata collections. With GravitinoMetalake, users can list, create, load, alter and drop a catalog with specified identifier. """ - restClient: HTTPClient + + rest_client: HTTPClient + + API_METALAKES_CATALOGS_PATH = "api/metalakes/{}/catalogs/{}" def __init__(self, name: str = None, comment: str = None, properties: Dict[str, str] = None, audit: AuditDTO = None, rest_client: HTTPClient = None): super().__init__(name=name, comment=comment, properties=properties, audit=audit) - self.restClient = rest_client + self.rest_client = rest_client @classmethod def build(cls, metalake: MetalakeDTO = None, client: HTTPClient = None): return cls(name=metalake.name, comment=metalake.comment, properties=metalake.properties, audit=metalake.audit, rest_client=client) + + def list_catalogs(self, namespace: Namespace) -> List[NameIdentifier]: + """List all the catalogs under this metalake with specified namespace. + + Args: + namespace The namespace to list the catalogs under it. + + Raises: + NoSuchMetalakeException if the metalake with specified namespace does not exist. + + Returns: + A list of {@link NameIdentifier} of the catalogs under the specified namespace. + """ + Namespace.check_catalog(namespace) + url = f"api/metalakes/{namespace.level(0)}/catalogs" + response = self.rest_client.get(url) + entityList = EntityListResponse.from_json(response.body, infer_missing=True) + entityList.validate() + return entityList.idents + + def list_catalogs_info(self, namespace: Namespace) -> List[Catalog]: + """List all the catalogs with their information under this metalake with specified namespace. + + Args: + namespace The namespace to list the catalogs under it. + + Raises: + NoSuchMetalakeException if the metalake with specified namespace does not exist. + + Returns: + A list of Catalog under the specified namespace. + """ + Namespace.check_catalog(namespace) + params = {"details": "true"} + url = f"api/metalakes/{namespace.level(0)}/catalogs" + response = self.rest_client.get(url, params=params) + catalog_list = CatalogListResponse.from_json(response.body, infer_missing=True) + + return [DTOConverters.to_catalog(catalog, self.rest_client) for catalog in catalog_list.catalogs()] + + def load_catalog(self, ident: NameIdentifier) -> Catalog: + """Load the catalog with specified identifier. + + Args: + ident: The identifier of the catalog to load. + + Raises: + NoSuchCatalogException if the catalog with specified identifier does not exist. + + Returns: + The Catalog with specified identifier. + """ + NameIdentifier.check_catalog(ident) + url = self.API_METALAKES_CATALOGS_PATH.format(ident.namespace().level(0), ident.name()) + response = self.rest_client.get(url) + catalog_resp = CatalogResponse.from_json(response.body, infer_missing=True) + + return DTOConverters.to_catalog(catalog_resp.catalog(), self.rest_client) + + def create_catalog(self, ident: NameIdentifier, + type: Catalog.Type, + provider: str, + comment: str, + properties: Dict[str, str]) -> Catalog: + """Create a new catalog with specified identifier, type, comment and properties. + + Args: + ident: The identifier of the catalog. + type: The type of the catalog. + provider: The provider of the catalog. + comment: The comment of the catalog. + properties: The properties of the catalog. + + Raises: + NoSuchMetalakeException if the metalake with specified namespace does not exist. + CatalogAlreadyExistsException if the catalog with specified identifier already exists. + + Returns: + The created Catalog. + """ + NameIdentifier.check_catalog(ident) + + catalog_create_request = CatalogCreateRequest(name=ident.name(), + type=type, + provider=provider, + comment=comment, + properties=properties) + catalog_create_request.validate() + + url = f"api/metalakes/{ident.namespace().level(0)}/catalogs" + response = self.rest_client.post(url, json=catalog_create_request) + catalog_resp = CatalogResponse.from_json(response.body, infer_missing=True) + + return DTOConverters.to_catalog(catalog_resp.catalog(), self.rest_client) + + def alter_catalog(self, ident: NameIdentifier, *changes: CatalogChange) -> Catalog: + """Alter the catalog with specified identifier by applying the changes. + + Args: + ident: the identifier of the catalog. + changes: the changes to apply to the catalog. + + Raises: + NoSuchCatalogException if the catalog with specified identifier does not exist. + IllegalArgumentException if the changes are invalid. + + Returns: + the altered Catalog. + """ + NameIdentifier.check_catalog(ident) + + reqs = [DTOConverters.to_catalog_update_request(change) for change in changes] + updates_request = CatalogUpdatesRequest(reqs) + updates_request.validate() + + url = self.API_METALAKES_CATALOGS_PATH.format(ident.namespace().level(0), ident.name()) + response = self.rest_client.put(url, json=updates_request) + catalog_response = CatalogResponse.from_json(response.body, infer_missing=True) + catalog_response.validate() + + return DTOConverters.to_catalog(catalog_response.catalog(), self.rest_client) + + def drop_catalog(self, ident: NameIdentifier) -> bool: + """Drop the catalog with specified identifier. + + Args: + ident the identifier of the catalog. + + Returns: + true if the catalog is dropped successfully, false otherwise. + """ + try: + url = self.API_METALAKES_CATALOGS_PATH.format(ident.namespace().level(0), ident.name()) + response = self.rest_client.delete(url) + + drop_response = DropResponse.from_json(response.body, infer_missing=True) + drop_response.validate() + + return drop_response.dropped() + except Exception as e: + logger.warning(f"Failed to drop catalog {ident}: {e}") + return False diff --git a/clients/client-python/gravitino/dto/audit_dto.py b/clients/client-python/gravitino/dto/audit_dto.py index b1d2a8ba94e..0a05903f0fd 100644 --- a/clients/client-python/gravitino/dto/audit_dto.py +++ b/clients/client-python/gravitino/dto/audit_dto.py @@ -7,27 +7,53 @@ from dataclasses_json import DataClassJsonMixin, config +from gravitino.api.audit import Audit + @dataclass -class AuditDTO(DataClassJsonMixin): +class AuditDTO(Audit, DataClassJsonMixin): """Data transfer object representing audit information.""" - creator: Optional[str] + _creator: Optional[str] = field(default=None, metadata=config(field_name='creator')) """The creator of the audit.""" - create_time: Optional[str] = field(metadata=config(field_name='createTime')) # TODO: Can't deserialized datetime from JSON + _create_time: Optional[str] = field(default=None, metadata=config( + field_name='createTime')) # TODO: Can't deserialized datetime from JSON """The create time of the audit.""" - last_modifier: Optional[str] = field(metadata=config(field_name='lastModifier')) + _last_modifier: Optional[str] = field(default=None, metadata=config(field_name='lastModifier')) """The last modifier of the audit.""" - last_modified_time: Optional[str] = field( - metadata=config(field_name='lastModifiedTime')) # TODO: Can't deserialized datetime from JSON + _last_modified_time: Optional[str] = field(default=None, metadata=config( + field_name='lastModifiedTime')) # TODO: Can't deserialized datetime from JSON """The last modified time of the audit.""" - def __init__(self, creator: str = None, create_time: str = None, last_modifier: str = None, - last_modified_time: str = None): - self.creator: str = creator - self.create_time: str = create_time - self.last_modifier: str = last_modifier - self.last_modified_time: str = last_modified_time + def creator(self) -> str: + """The creator of the entity. + + Returns: + the creator of the entity. + """ + return self._creator + + def create_time(self) -> str: + """The creation time of the entity. + + Returns: + The creation time of the entity. + """ + return self._create_time + + def last_modifier(self) -> str: + """ + Returns: + The last modifier of the entity. + """ + return self._last_modifier + + def last_modified_time(self) -> str: + """ + Returns: + The last modified time of the entity. + """ + return self._last_modified_time diff --git a/clients/client-python/gravitino/dto/catalog_dto.py b/clients/client-python/gravitino/dto/catalog_dto.py new file mode 100644 index 00000000000..012c224dfae --- /dev/null +++ b/clients/client-python/gravitino/dto/catalog_dto.py @@ -0,0 +1,51 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from typing import Dict +from dataclasses import dataclass, field + +from dataclasses_json import config + +from .audit_dto import AuditDTO +from ..api.catalog import Catalog + + +@dataclass +class CatalogDTO(Catalog): + """Data transfer object representing catalog information.""" + + _name: str = field(metadata=config(field_name='name')) + _type: Catalog.Type = field(metadata=config(field_name='type')) + _provider: str = field(metadata=config(field_name='provider')) + _comment: str = field(metadata=config(field_name='comment')) + _properties: Dict[str, str] = field(metadata=config(field_name='properties')) + _audit: AuditDTO = field(default=None, metadata=config(field_name='audit')) + + def builder(self, name: str = None, type: Catalog.Type = Catalog.Type.UNSUPPORTED, + provider: str = None, comment: str = None, properties: Dict[str, str] = None, + audit: AuditDTO = None): + self._name = name + self._type = type + self._provider = provider + self._comment = comment + self._properties = properties + self._audit = audit + + def name(self) -> str: + return self._name + + def type(self) -> Catalog.Type: + return self._type + + def provider(self) -> str: + return self._provider + + def comment(self) -> str: + return self._comment + + def properties(self) -> Dict[str, str]: + return self._properties + + def audit_info(self) -> AuditDTO: + return self._audit diff --git a/clients/client-python/gravitino/dto/dto_converters.py b/clients/client-python/gravitino/dto/dto_converters.py index 67c745239e8..c0e66165b2b 100644 --- a/clients/client-python/gravitino/dto/dto_converters.py +++ b/clients/client-python/gravitino/dto/dto_converters.py @@ -2,8 +2,14 @@ Copyright 2024 Datastrato Pvt Ltd. This software is licensed under the Apache License version 2. """ +from gravitino.api.catalog import Catalog +from gravitino.api.catalog_change import CatalogChange +from gravitino.catalog.fileset_catalog import FilesetCatalog +from gravitino.dto.catalog_dto import CatalogDTO +from gravitino.dto.requests.catalog_update_request import CatalogUpdateRequest from gravitino.dto.requests.metalake_update_request import MetalakeUpdateRequest -from gravitino.meta_change import MetalakeChange +from gravitino.api.metalake_change import MetalakeChange +from gravitino.utils import HTTPClient class DTOConverters: @@ -22,3 +28,29 @@ def to_metalake_update_request(change: MetalakeChange) -> object: return MetalakeUpdateRequest.RemoveMetalakePropertyRequest(change.property) else: raise ValueError(f"Unknown change type: {type(change).__name__}") + + @staticmethod + def to_catalog(catalog: CatalogDTO, client: HTTPClient): + if catalog.type() == Catalog.Type.FILESET: + return FilesetCatalog(name=catalog.name(), + type=catalog.type(), + provider=catalog.provider(), + comment=catalog.comment(), + properties=catalog.properties(), + audit=catalog.audit_info(), + rest_client=client) + else: + raise NotImplementedError("Unsupported catalog type: " + str(catalog.type())) + + @staticmethod + def to_catalog_update_request(change: CatalogChange): + if isinstance(change, CatalogChange.RenameCatalog): + return CatalogUpdateRequest.RenameCatalogRequest(change.new_name) + elif isinstance(change, CatalogChange.UpdateCatalogComment): + return CatalogUpdateRequest.UpdateCatalogCommentRequest(change.new_comment) + elif isinstance(change, CatalogChange.SetProperty): + return CatalogUpdateRequest.SetCatalogPropertyRequest(change.property, change.value) + elif isinstance(change, CatalogChange.RemoveProperty): + return CatalogUpdateRequest.RemoveCatalogPropertyRequest(change.property) + else: + raise ValueError(f"Unknown change type: {type(change).__name__}") diff --git a/clients/client-python/gravitino/dto/fileset_dto.py b/clients/client-python/gravitino/dto/fileset_dto.py new file mode 100644 index 00000000000..f5540f4efe1 --- /dev/null +++ b/clients/client-python/gravitino/dto/fileset_dto.py @@ -0,0 +1,41 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass, field +from typing import Optional, Dict + +from dataclasses_json import config, DataClassJsonMixin + +from gravitino.api.fileset import Fileset +from gravitino.dto.audit_dto import AuditDTO + + +@dataclass +class FilesetDTO(Fileset, DataClassJsonMixin): + """Represents a Fileset DTO (Data Transfer Object).""" + + _name: str = field(metadata=config(field_name='name')) + _comment: Optional[str] = field(metadata=config(field_name='comment')) + _type: Fileset.Type = field(metadata=config(field_name='type')) + _properties: Dict[str, str] = field(metadata=config(field_name='properties')) + _storage_location: str = field(default=None, metadata=config(field_name='storageLocation')) + _audit: AuditDTO = field(default=None) + + def name(self) -> str: + return self._name + + def type(self) -> Fileset.Type: + return self._type + + def storage_location(self) -> str: + return self._storage_location + + def comment(self) -> Optional[str]: + return self._comment + + def properties(self) -> Dict[str, str]: + return self._properties + + def audit_info(self) -> AuditDTO: + return self._audit diff --git a/clients/client-python/gravitino/dto/metalake_dto.py b/clients/client-python/gravitino/dto/metalake_dto.py index e8c00b28348..627d772ca68 100644 --- a/clients/client-python/gravitino/dto/metalake_dto.py +++ b/clients/client-python/gravitino/dto/metalake_dto.py @@ -26,13 +26,6 @@ class MetalakeDTO(DataClassJsonMixin): audit: AuditDTO = None """The audit information of the Metalake DTO.""" - def __init__(self, name: str = None, comment: str = None, properties: Dict[str, str] = None, - audit: AuditDTO = None): - self.name = name - self.comment = comment - self.properties = properties - self.audit = audit - def equals(self, other): if self == other: return True diff --git a/clients/client-python/gravitino/dto/requests/catalog_create_request.py b/clients/client-python/gravitino/dto/requests/catalog_create_request.py new file mode 100644 index 00000000000..78b00ef0f43 --- /dev/null +++ b/clients/client-python/gravitino/dto/requests/catalog_create_request.py @@ -0,0 +1,38 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass +from typing import Optional, Dict + +from dataclasses_json import DataClassJsonMixin + +from gravitino.api.catalog import Catalog + + +@dataclass +class CatalogCreateRequest(DataClassJsonMixin): + """Represents a request to create a catalog.""" + name: str + type: Catalog.Type + provider: str + comment: Optional[str] + properties: Optional[Dict[str, str]] + + def __init__(self, name: str = None, type: Catalog.Type = Catalog.Type.UNSUPPORTED, provider: str = None, + comment: str = None, properties: Dict[str, str] = None): + self.name = name + self.type = type + self.provider = provider + self.comment = comment + self.properties = properties + + def validate(self): + """Validates the fields of the request. + + Raises: + IllegalArgumentException if name or type are not set. + """ + assert self.name is not None, "\"name\" field is required and cannot be empty" + assert self.type is not None, "\"type\" field is required and cannot be empty" + assert self.provider is not None, "\"provider\" field is required and cannot be empty" diff --git a/clients/client-python/gravitino/dto/requests/catalog_update_request.py b/clients/client-python/gravitino/dto/requests/catalog_update_request.py new file mode 100644 index 00000000000..668c9410f17 --- /dev/null +++ b/clients/client-python/gravitino/dto/requests/catalog_update_request.py @@ -0,0 +1,75 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import abstractmethod +from dataclasses import field, dataclass +from typing import Optional + +from dataclasses_json import config + +from gravitino.api.catalog_change import CatalogChange +from gravitino.rest.rest_message import RESTRequest + + +@dataclass +class CatalogUpdateRequestBase(RESTRequest): + type: str = field(metadata=config(field_name='@type')) + + def __init__(self, type: str): + self.type = type + + @abstractmethod + def catalog_change(self): + pass + + +class CatalogUpdateRequest: + """Represents an interface for catalog update requests.""" + + class RenameCatalogRequest(CatalogUpdateRequestBase): + new_name: Optional[str] = field(metadata=config(field_name='newName')) + + def catalog_change(self): + return CatalogChange.rename(self.new_name) + + def validate(self): + """Validates the fields of the request. + + Raises: + IllegalArgumentException if the new name is not set. + """ + assert self.new_name is None, '"newName" field is required and cannot be empty' + + class UpdateCatalogCommentRequest(CatalogUpdateRequestBase): + """Request to update the comment of a catalog.""" + + new_comment: Optional[str] = field(metadata=config(field_name='newComment')) + + def catalog_change(self): + return CatalogChange.update_comment(self.new_comment) + + def validate(self): + assert self.new_comment is None, '"newComment" field is required and cannot be empty' + + class SetCatalogPropertyRequest(CatalogUpdateRequestBase): + """Request to set a property on a catalog.""" + property: Optional[str] = None + value: Optional[str] = None + + def catalog_change(self): + return CatalogChange.set_property(self.property, self.value) + + def validate(self): + assert self.property is None, "\"property\" field is required and cannot be empty" + assert self.value is None, "\"value\" field is required and cannot be empty" + + class RemoveCatalogPropertyRequest(CatalogUpdateRequestBase): + """Request to remove a property from a catalog.""" + property: Optional[str] = None + + def catalog_change(self): + return CatalogChange.remove_property(self.property) + + def validate(self): + assert self.property is None, "\"property\" field is required and cannot be empty" diff --git a/clients/client-python/gravitino/dto/requests/catalog_updates_request.py b/clients/client-python/gravitino/dto/requests/catalog_updates_request.py new file mode 100644 index 00000000000..d61c31d5aa7 --- /dev/null +++ b/clients/client-python/gravitino/dto/requests/catalog_updates_request.py @@ -0,0 +1,31 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass +from typing import Optional, List + +from dataclasses_json import DataClassJsonMixin + +from gravitino.dto.requests.catalog_update_request import CatalogUpdateRequest + + +@dataclass +class CatalogUpdatesRequest(DataClassJsonMixin): + """Represents a request containing multiple catalog updates.""" + updates: Optional[List[CatalogUpdateRequest]] + + def __init__(self, updates: List[CatalogUpdateRequest] = None): + self.updates = updates + + def validate(self): + """Validates each request in the list. + + Raises: + IllegalArgumentException if validation of any request fails. + """ + if self.updates is not None: + for update_request in self.updates: + update_request.validate() + else: + raise ValueError("Updates cannot be null") diff --git a/clients/client-python/gravitino/dto/requests/fileset_create_request.py b/clients/client-python/gravitino/dto/requests/fileset_create_request.py new file mode 100644 index 00000000000..a21ffe78929 --- /dev/null +++ b/clients/client-python/gravitino/dto/requests/fileset_create_request.py @@ -0,0 +1,29 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass, field +from typing import Optional, Dict + +from dataclasses_json import DataClassJsonMixin, config + +from gravitino.api.fileset import Fileset + + +@dataclass +class FilesetCreateRequest(DataClassJsonMixin): + """Represents a request to create a fileset.""" + name: str + comment: Optional[str] + type: Fileset.Type + storage_location: str = field(metadata=config(field_name='storageLocation')) + properties: Dict[str, str] + + def validate(self): + """Validates the request. + + Raises: + IllegalArgumentException if the request is invalid. + """ + if not self.name: + raise ValueError('"name" field is required and cannot be empty') diff --git a/clients/client-python/gravitino/dto/requests/fileset_update_request.py b/clients/client-python/gravitino/dto/requests/fileset_update_request.py new file mode 100644 index 00000000000..4e294cae40f --- /dev/null +++ b/clients/client-python/gravitino/dto/requests/fileset_update_request.py @@ -0,0 +1,131 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import abstractmethod +from dataclasses import dataclass, field + +from dataclasses_json import config + +from gravitino.api.fileset_change import FilesetChange +from gravitino.rest.rest_message import RESTRequest + + +@dataclass +class FilesetUpdateRequestBase(RESTRequest): + _type: str = field(metadata=config(field_name='@type')) + + def __init__(self, type: str): + self._type = type + + @abstractmethod + def fileset_change(self): + pass + + +class FilesetUpdateRequest: + """Request to update a fileset.""" + + @dataclass + class RenameFilesetRequest(FilesetUpdateRequestBase): + """The fileset update request for renaming a fileset.""" + + new_name: str = field(metadata=config(field_name='newName')) + """The new name for the Fileset.""" + + def __init__(self, new_name: str): + super().__init__("rename") + self.new_name = new_name + + def validate(self): + """Validates the fields of the request. + + Raises: + IllegalArgumentException if the new name is not set. + """ + if not self.new_name: + raise ValueError('"new_name" field is required and cannot be empty') + + def fileset_change(self): + """Returns the fileset change. + + Returns: + the fileset change. + """ + return FilesetChange.rename(self.new_name) + + @dataclass + class UpdateFilesetCommentRequest(FilesetUpdateRequestBase): + """Represents a request to update the comment on a Fileset.""" + + new_comment: str = field(metadata=config(field_name='newComment')) + """The new comment for the Fileset.""" + + def __init__(self, new_comment: str): + super().__init__("updateComment") + self.new_comment = new_comment + + def validate(self): + """Validates the fields of the request. + + Raises: + IllegalArgumentException if the new comment is not set. + """ + if not self.new_comment: + raise ValueError('"new_comment" field is required and cannot be empty') + + def fileset_change(self): + """Returns the fileset change""" + return FilesetChange.update_comment(self.new_comment) + + @dataclass + class SetFilesetPropertyRequest(FilesetUpdateRequestBase): + """Represents a request to set a property on a Fileset.""" + + property: str = None + """The property to set.""" + + value: str = None + """The value of the property.""" + + def __init__(self, property: str, value: str): + super().__init__("setProperty") + self.property = property + self.value = value + + def validate(self): + """Validates the fields of the request. + + Raises: + IllegalArgumentException if property or value are not set. + """ + if not self.property: + raise ValueError('"property" field is required and cannot be empty') + if not self.value: + raise ValueError('"value" field is required and cannot be empty') + + def fileset_change(self): + return FilesetChange.set_property(self.property, self.value) + + @dataclass + class RemoveFilesetPropertyRequest(FilesetUpdateRequestBase): + """Represents a request to remove a property from a Fileset.""" + + property: str = None + """The property to remove.""" + + def __init__(self, property: str): + super().__init__("removeProperty") + self.property = property + + def validate(self): + """Validates the fields of the request. + + Raises: + IllegalArgumentException if property is not set. + """ + if not self.property: + raise ValueError('"property" field is required and cannot be empty') + + def fileset_change(self): + return FilesetChange.remove_property(self.property) diff --git a/clients/client-python/gravitino/dto/requests/fileset_updates_request.py b/clients/client-python/gravitino/dto/requests/fileset_updates_request.py new file mode 100644 index 00000000000..12ba1dadca2 --- /dev/null +++ b/clients/client-python/gravitino/dto/requests/fileset_updates_request.py @@ -0,0 +1,21 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass, field +from typing import Optional, List + +from gravitino.dto.requests.fileset_update_request import FilesetUpdateRequest +from gravitino.rest.rest_message import RESTRequest + + +@dataclass +class FilesetUpdatesRequest(RESTRequest): + """Request to represent updates to a fileset.""" + updates: List[FilesetUpdateRequest] = field(default_factory=list) + + def validate(self): + if not self.updates: + raise ValueError("Updates cannot be empty") + for update_request in self.updates: + update_request.validate() \ No newline at end of file diff --git a/clients/client-python/gravitino/dto/requests/metalake_update_request.py b/clients/client-python/gravitino/dto/requests/metalake_update_request.py index d2e3c455b0d..dfa639b6cff 100644 --- a/clients/client-python/gravitino/dto/requests/metalake_update_request.py +++ b/clients/client-python/gravitino/dto/requests/metalake_update_request.py @@ -2,43 +2,40 @@ Copyright 2024 Datastrato Pvt Ltd. This software is licensed under the Apache License version 2. """ -from abc import abstractmethod, ABC +from abc import abstractmethod from dataclasses import dataclass, field -from dataclasses_json import config, DataClassJsonMixin +from dataclasses_json import config -from gravitino.meta_change import MetalakeChange +from gravitino.api.metalake_change import MetalakeChange +from gravitino.rest.rest_message import RESTRequest @dataclass -class MetalakeUpdateRequestType(DataClassJsonMixin): +class MetalakeUpdateRequestBase(RESTRequest): type: str = field(metadata=config(field_name='@type')) def __init__(self, type: str): self.type = type - -class MetalakeUpdateRequest: - """Represents an interface for Metalake update requests.""" - - @abstractmethod - def validate(self): - pass - @abstractmethod def metalake_change(self): pass + +class MetalakeUpdateRequest: + """Represents an interface for Metalake update requests.""" + @dataclass - class RenameMetalakeRequest(MetalakeUpdateRequestType): + class RenameMetalakeRequest(MetalakeUpdateRequestBase): """Represents a request to rename a Metalake.""" - newName: str = None + new_name: str = field(metadata=config(field_name='newName')) """The new name for the Metalake.""" - def __init__(self, newName: str): + def __init__(self, new_name: str): super().__init__("rename") - self.newName = newName + self.new_name = new_name def validate(self): """Validates the fields of the request. @@ -46,22 +43,22 @@ def validate(self): Raises: IllegalArgumentException if the new name is not set. """ - if not self.newName: + if not self.new_name: raise ValueError('"newName" field is required and cannot be empty') def metalake_change(self): - return MetalakeChange.rename(self.newName) + return MetalakeChange.rename(self.new_name) @dataclass - class UpdateMetalakeCommentRequest(MetalakeUpdateRequestType): + class UpdateMetalakeCommentRequest(MetalakeUpdateRequestBase): """Represents a request to update the comment on a Metalake.""" - newComment: str = None + new_comment: str = field(metadata=config(field_name='newComment')) """The new comment for the Metalake.""" - def __init__(self, newComment: str): + def __init__(self, new_comment: str): super().__init__("updateComment") - self.newComment = newComment + self.new_comment = new_comment def validate(self): """Validates the fields of the request. @@ -69,14 +66,14 @@ def validate(self): Raises: IllegalArgumentException if the new comment is not set. """ - if not self.newComment: + if not self.new_comment: raise ValueError('"newComment" field is required and cannot be empty') def metalake_change(self): - return MetalakeChange.update_comment(self.newComment) + return MetalakeChange.update_comment(self.new_comment) @dataclass - class SetMetalakePropertyRequest(MetalakeUpdateRequestType): + class SetMetalakePropertyRequest(MetalakeUpdateRequestBase): """Represents a request to set a property on a Metalake.""" property: str = None @@ -105,7 +102,7 @@ def metalake_change(self): return MetalakeChange.set_property(self.property, self.value) @dataclass - class RemoveMetalakePropertyRequest(MetalakeUpdateRequestType): + class RemoveMetalakePropertyRequest(MetalakeUpdateRequestBase): """Represents a request to remove a property from a Metalake.""" property: str = None diff --git a/clients/client-python/gravitino/dto/requests/schema_create_request.py b/clients/client-python/gravitino/dto/requests/schema_create_request.py new file mode 100644 index 00000000000..d77c668ab94 --- /dev/null +++ b/clients/client-python/gravitino/dto/requests/schema_create_request.py @@ -0,0 +1,20 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass +from typing import Optional, Dict + +from gravitino.rest.rest_message import RESTRequest + + +@dataclass +class SchemaCreateRequest(RESTRequest): + """Represents a request to create a schema.""" + + name: str + comment: Optional[str] + properties: Optional[Dict[str, str]] + + def validate(self): + assert self.name is not None, "\"name\" field is required and cannot be empty" diff --git a/clients/client-python/gravitino/dto/requests/schema_update_request.py b/clients/client-python/gravitino/dto/requests/schema_update_request.py new file mode 100644 index 00000000000..b11c7929273 --- /dev/null +++ b/clients/client-python/gravitino/dto/requests/schema_update_request.py @@ -0,0 +1,79 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import abstractmethod +from dataclasses import dataclass, field + +from dataclasses_json import config + +from gravitino.api.schema_change import SchemaChange +from gravitino.rest.rest_message import RESTRequest + + +@dataclass +class SchemaUpdateRequestBase(RESTRequest): + type: str = field(metadata=config(field_name='@type')) + + def __init__(self, type: str): + self.type = type + + @abstractmethod + def schema_change(self): + pass + +@dataclass +class SchemaUpdateRequest: + """Represents an interface for Schema update requests.""" + + @dataclass + class SetSchemaPropertyRequest(SchemaUpdateRequestBase): + """Represents a request to set a property on a Schema.""" + + property: str = None + """The property to set.""" + + value: str = None + """The value of the property.""" + + def __init__(self, property: str, value: str): + super().__init__("setProperty") + self.property = property + self.value = value + + def validate(self): + """Validates the fields of the request. + + Raises: + IllegalArgumentException if property or value are not set. + """ + if not self.property: + raise ValueError('"property" field is required and cannot be empty') + if not self.value: + raise ValueError('"value" field is required and cannot be empty') + + def schema_change(self): + return SchemaChange.set_property(self.property, self.value) + + @dataclass + class RemoveSchemaPropertyRequest(SchemaUpdateRequestBase): + """Represents a request to remove a property from a Schema.""" + + property: str = None + """The property to remove.""" + + def __init__(self, property: str): + super().__init__("removeProperty") + self.property = property + + def validate(self): + """Validates the fields of the request. + + Raises: + IllegalArgumentException if property is not set. + """ + if not self.property: + raise ValueError('"property" field is required and cannot be empty') + + def schema_change(self): + return SchemaChange.remove_property(self.property) diff --git a/clients/client-python/gravitino/dto/requests/schema_updates_request.py b/clients/client-python/gravitino/dto/requests/schema_updates_request.py new file mode 100644 index 00000000000..084a76753a4 --- /dev/null +++ b/clients/client-python/gravitino/dto/requests/schema_updates_request.py @@ -0,0 +1,27 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass, field +from typing import Optional, List + +from dataclasses_json import DataClassJsonMixin + +from gravitino.dto.requests.schema_update_request import SchemaUpdateRequest + + +@dataclass +class SchemaUpdatesRequest(DataClassJsonMixin): + """Represents a request to update a schema.""" + updates: Optional[List[SchemaUpdateRequest]] = field(default_factory=list) + + def validate(self): + """Validates the request. + + Raises: + IllegalArgumentException If the request is invalid, this exception is thrown. + """ + if not self.updates: + raise ValueError("Updates cannot be empty") + for update_request in self.updates: + update_request.validate() \ No newline at end of file diff --git a/clients/client-python/gravitino/dto/responses/base_response.py b/clients/client-python/gravitino/dto/responses/base_response.py index dca8f36b8d3..a2eae0e398b 100644 --- a/clients/client-python/gravitino/dto/responses/base_response.py +++ b/clients/client-python/gravitino/dto/responses/base_response.py @@ -4,19 +4,15 @@ """ from dataclasses import dataclass -from dataclasses_json import DataClassJsonMixin +from gravitino.rest.rest_message import RESTResponse @dataclass -class BaseResponse(DataClassJsonMixin): +class BaseResponse(RESTResponse): """Represents a base response for REST API calls.""" code: int - @classmethod - def default(cls): - return cls(code=0) - def validate(self): """Validates the response code. TODO: @throws IllegalArgumentException if code value is negative. diff --git a/clients/client-python/gravitino/dto/responses/catalog_list_response.py b/clients/client-python/gravitino/dto/responses/catalog_list_response.py new file mode 100644 index 00000000000..78b637bc8f3 --- /dev/null +++ b/clients/client-python/gravitino/dto/responses/catalog_list_response.py @@ -0,0 +1,24 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass, field +from typing import List + +from dataclasses_json import config + +from .base_response import BaseResponse +from ..catalog_dto import CatalogDTO + + +@dataclass +class CatalogListResponse(BaseResponse): + """Represents a response for a list of catalogs with their information.""" + _catalogs: List[CatalogDTO] = field(metadata=config(field_name='catalogs')) + + def __init__(self, catalogs: List[CatalogDTO]): + super().__init__(0) + self._catalogs = catalogs + + def catalogs(self) -> List[CatalogDTO]: + return self._catalogs diff --git a/clients/client-python/gravitino/dto/responses/catalog_response.py b/clients/client-python/gravitino/dto/responses/catalog_response.py new file mode 100644 index 00000000000..79c286747c3 --- /dev/null +++ b/clients/client-python/gravitino/dto/responses/catalog_response.py @@ -0,0 +1,32 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass, field + +from dataclasses_json import config + +from .base_response import BaseResponse +from ..catalog_dto import CatalogDTO + + +@dataclass +class CatalogResponse(BaseResponse): + """Represents a response containing catalog information.""" + _catalog: CatalogDTO = field(metadata=config(field_name='catalog')) + + def validate(self): + """Validates the response data. + + Raises: + IllegalArgumentException if the catalog name, type or audit is not set. + """ + super().validate() + + assert self.catalog is not None, "catalog must not be null" + assert self.catalog.name() is not None, "catalog 'name' must not be null and empty" + assert self.catalog.type() is not None, "catalog 'type' must not be null" + assert self.catalog.audit_info() is not None, "catalog 'audit' must not be null" + + def catalog(self) -> CatalogDTO: + return self._catalog diff --git a/clients/client-python/gravitino/dto/responses/drop_response.py b/clients/client-python/gravitino/dto/responses/drop_response.py index de1a6908bfb..fb2c548b2f0 100644 --- a/clients/client-python/gravitino/dto/responses/drop_response.py +++ b/clients/client-python/gravitino/dto/responses/drop_response.py @@ -2,13 +2,18 @@ Copyright 2024 Datastrato Pvt Ltd. This software is licensed under the Apache License version 2. """ +from dataclasses import dataclass, field + +from dataclasses_json import config + from gravitino.dto.responses.base_response import BaseResponse +@dataclass class DropResponse(BaseResponse): """Represents a response for a drop operation.""" - dropped : bool + _dropped: bool = field(metadata=config(field_name='dropped')) def dropped(self) -> bool: - return self.dropped + return self._dropped diff --git a/clients/client-python/gravitino/dto/responses/entity_list_response.py b/clients/client-python/gravitino/dto/responses/entity_list_response.py new file mode 100644 index 00000000000..489ab1568a3 --- /dev/null +++ b/clients/client-python/gravitino/dto/responses/entity_list_response.py @@ -0,0 +1,27 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass, field +from typing import Optional, List + +from dataclasses_json import config + +from gravitino.dto.responses.base_response import BaseResponse +from gravitino.name_identifier import NameIdentifier + + +@dataclass +class EntityListResponse(BaseResponse): + """Represents a response containing a list of catalogs.""" + idents: Optional[List[NameIdentifier]] = field(metadata=config(field_name='identifiers')) + + def validate(self): + """Validates the response data. + + Raises: + IllegalArgumentException if catalog identifiers are not set. + """ + super().validate() + + assert self.idents is not None, "identifiers must not be null" \ No newline at end of file diff --git a/clients/client-python/gravitino/dto/responses/fileset_response.py b/clients/client-python/gravitino/dto/responses/fileset_response.py new file mode 100644 index 00000000000..c925ffbe5a8 --- /dev/null +++ b/clients/client-python/gravitino/dto/responses/fileset_response.py @@ -0,0 +1,26 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass + +from gravitino.dto.fileset_dto import FilesetDTO +from gravitino.dto.responses.base_response import BaseResponse + + +@dataclass +class FilesetResponse(BaseResponse): + """Response for fileset creation.""" + fileset: FilesetDTO + + def validate(self): + """Validates the response data. + + Raises: + IllegalArgumentException if catalog identifiers are not set. + """ + super().validate() + assert self.fileset is not None, "fileset must not be null" + assert self.fileset.name, "fileset 'name' must not be null and empty" + assert self.fileset.storage_location, "fileset 'storageLocation' must not be null and empty" + assert self.fileset.type is not None, "fileset 'type' must not be null and empty" diff --git a/clients/client-python/gravitino/dto/responses/metalake_list_response.py b/clients/client-python/gravitino/dto/responses/metalake_list_response.py index a7bfcddaa4c..afa01520713 100644 --- a/clients/client-python/gravitino/dto/responses/metalake_list_response.py +++ b/clients/client-python/gravitino/dto/responses/metalake_list_response.py @@ -16,6 +16,11 @@ class MetalakeListResponse(BaseResponse): metalakes: List[MetalakeDTO] def validate(self): + """Validates the response data. + + Raises: + IllegalArgumentException if catalog identifiers are not set. + """ super().validate() if self.metalakes is None: diff --git a/clients/client-python/gravitino/dto/responses/schema_response.py b/clients/client-python/gravitino/dto/responses/schema_response.py new file mode 100644 index 00000000000..f825c39bcce --- /dev/null +++ b/clients/client-python/gravitino/dto/responses/schema_response.py @@ -0,0 +1,29 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass +from typing import Optional + +from dataclasses_json import DataClassJsonMixin + +from gravitino.dto.responses.base_response import BaseResponse +from gravitino.dto.schema_dto import SchemaDTO + + +@dataclass +class SchemaResponse(BaseResponse, DataClassJsonMixin): + """Represents a response for a schema.""" + schema: Optional[SchemaDTO] + + def validate(self): + """Validates the response data. + + Raises: + IllegalArgumentException if catalog identifiers are not set. + """ + super().validate() + + assert self.schema is not None, "schema must be non-null" + assert self.schema.name is not None, "schema 'name' must not be null and empty" + assert self.schema.audit is not None, "schema 'audit' must not be null" diff --git a/clients/client-python/gravitino/dto/schema_dto.py b/clients/client-python/gravitino/dto/schema_dto.py new file mode 100644 index 00000000000..f1638153a1c --- /dev/null +++ b/clients/client-python/gravitino/dto/schema_dto.py @@ -0,0 +1,34 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from dataclasses import dataclass +from typing import Optional, Dict + +from dataclasses_json import DataClassJsonMixin + +from gravitino.dto.audit_dto import AuditDTO + + +@dataclass +class SchemaDTO(DataClassJsonMixin): + """Represents a Schema DTO (Data Transfer Object).""" + + name: str + """The name of the Metalake DTO.""" + + comment: Optional[str] + """The comment of the Metalake DTO.""" + + properties: Optional[Dict[str, str]] = None + """The properties of the Metalake DTO.""" + + audit: AuditDTO = None + """The audit information of the Metalake DTO.""" + + def __init__(self, name: str = None, comment: str = None, properties: Dict[str, str] = None, + audit: AuditDTO = None): + self.name = name + self.comment = comment + self.properties = properties + self.audit = audit diff --git a/clients/client-python/gravitino/name_identifier.py b/clients/client-python/gravitino/name_identifier.py index 8c0c93b6062..d40ebd2cd67 100644 --- a/clients/client-python/gravitino/name_identifier.py +++ b/clients/client-python/gravitino/name_identifier.py @@ -13,62 +13,73 @@ class NameIdentifier: schema. """ - DOT = '.' + DOT: str = '.' + + _namespace: Namespace = None + _name: str = None def __init__(self, namespace: Namespace, name: str): - self.namespace = namespace - self.name = name + self._namespace = namespace + self._name = name - def NameIdentifier(self, namespace, name): - self.check(namespace is not None, "Cannot create a NameIdentifier with null namespace") - self.check(name is not None and name != "", "Cannot create a NameIdentifier with null or empty name") + def namespace(self): + return self._namespace - self.namespace = namespace - self.name = name + def name(self): + return self._name @staticmethod def of(*names: str) -> 'NameIdentifier': - """Create the NameIdentifier with the given {@link Namespace} and name. + """Create the NameIdentifier with the given levels of names. Args: - namespace: The namespace of the identifier - name: The name of the identifier + names The names of the identifier - Return: + Returns: The created NameIdentifier """ + NameIdentifier.check(names is not None, "Cannot create a NameIdentifier with null names") - NameIdentifier.check(len(names) != 0, "Cannot create a NameIdentifier with no names") + NameIdentifier.check(len(names) > 0, "Cannot create a NameIdentifier with no names") - return NameIdentifier(Namespace.of(names[:-1]), names[-1]) + return NameIdentifier(Namespace.of(*names[:-1]), names[-1]) @staticmethod def of_namespace(namespace: Namespace, name: str) -> 'NameIdentifier': - """Create the metalake NameIdentifier with the given name. + """Create the NameIdentifier with the given Namespace and name. Args: - metalake: The metalake name + namespace: The namespace of the identifier + name: The name of the identifier - Return: - The created metalake NameIdentifier + Returns: + The created NameIdentifier """ return NameIdentifier(namespace, name) @staticmethod def of_metalake(metalake: str) -> 'NameIdentifier': - """Create the catalog NameIdentifier with the given metalake and catalog name. + """Create the metalake NameIdentifier with the given name. Args: metalake: The metalake name - catalog: The catalog name - Return: - The created catalog NameIdentifier + Returns: + The created metalake NameIdentifier """ return NameIdentifier.of(metalake) @staticmethod def of_catalog(metalake: str, catalog: str) -> 'NameIdentifier': + """Create the catalog NameIdentifier with the given metalake and catalog name. + + Args: + metalake: The metalake name + catalog: The catalog name + + Returns: + The created catalog NameIdentifier + """ return NameIdentifier.of(metalake, catalog) @staticmethod @@ -80,7 +91,7 @@ def of_schema(metalake: str, catalog: str, schema: str) -> 'NameIdentifier': catalog: The catalog name schema: The schema name - Return: + Returns: The created schema NameIdentifier """ return NameIdentifier.of(metalake, catalog, schema) @@ -95,7 +106,7 @@ def of_table(metalake: str, catalog: str, schema: str, table: str) -> 'NameIdent schema: The schema name table: The table name - Return: + Returns: The created table NameIdentifier """ return NameIdentifier.of(metalake, catalog, schema, table) @@ -110,7 +121,7 @@ def of_fileset(metalake: str, catalog: str, schema: str, fileset: str) -> 'NameI schema: The schema name fileset: The fileset name - Return: + Returns: The created fileset NameIdentifier """ return NameIdentifier.of(metalake, catalog, schema, fileset) @@ -126,7 +137,7 @@ def of_topic(metalake: str, catalog: str, schema: str, topic: str) -> 'NameIdent schema: The schema name topic: The topic name - Return: + Returns: The created topic NameIdentifier """ return NameIdentifier.of(metalake, catalog, schema, topic) @@ -140,7 +151,7 @@ def check_metalake(ident: 'NameIdentifier') -> None: ident: The metalake NameIdentifier to check. """ NameIdentifier.check(ident is not None, "Metalake identifier must not be null") - Namespace.check_metalake(ident.namespace) + Namespace.check_metalake(ident.namespace()) @staticmethod def check_catalog(ident: 'NameIdentifier') -> None: @@ -150,8 +161,8 @@ def check_catalog(ident: 'NameIdentifier') -> None: Args: ident: The catalog NameIdentifier to check. """ - NameIdentifier.check(ident is None, "Catalog identifier must not be null") - Namespace.check_catalog(ident.namespace) + NameIdentifier.check(ident is not None, "Catalog identifier must not be null") + Namespace.check_catalog(ident.namespace()) @staticmethod def check_schema(ident: 'NameIdentifier') -> None: @@ -162,7 +173,7 @@ def check_schema(ident: 'NameIdentifier') -> None: ident: The schema NameIdentifier to check. """ NameIdentifier.check(ident is not None, "Schema identifier must not be null") - Namespace.check_schema(ident.namespace) + Namespace.check_schema(ident.namespace()) @staticmethod def check_table(ident: 'NameIdentifier') -> None: @@ -173,7 +184,7 @@ def check_table(ident: 'NameIdentifier') -> None: ident: The table NameIdentifier to check. """ NameIdentifier.check(ident is not None, "Table identifier must not be null") - Namespace.check_table(ident.namespace) + Namespace.check_table(ident.namespace()) @staticmethod def check_fileset(ident: 'NameIdentifier') -> None: @@ -184,7 +195,7 @@ def check_fileset(ident: 'NameIdentifier') -> None: ident: The fileset NameIdentifier to check. """ NameIdentifier.check(ident is not None, "Fileset identifier must not be null") - Namespace.check_fileset(ident.namespace) + Namespace.check_fileset(ident.namespace()) @staticmethod def check_topic(ident: 'NameIdentifier') -> None: @@ -195,7 +206,7 @@ def check_topic(ident: 'NameIdentifier') -> None: ident: The topic NameIdentifier to check. """ NameIdentifier.check(ident is not None, "Topic identifier must not be null") - Namespace.check_topic(ident.namespace) + Namespace.check_topic(ident.namespace()) @staticmethod def parse(identifier: str) -> 'NameIdentifier': @@ -204,7 +215,7 @@ def parse(identifier: str) -> 'NameIdentifier': Args: identifier: The identifier string - Return: + Returns: The created NameIdentifier """ NameIdentifier.check(identifier is not None and identifier != '', "Cannot parse a null or empty identifier") @@ -215,15 +226,15 @@ def parse(identifier: str) -> 'NameIdentifier': def has_namespace(self): """Check if the NameIdentifier has a namespace. - Return: + Returns: True if the NameIdentifier has a namespace, false otherwise. """ - return not self.namespace.is_empty() + return not self.namespace().is_empty() def get_namespace(self): """Get the namespace of the NameIdentifier. - Return: + Returns: The namespace of the NameIdentifier. """ return self.namespace @@ -231,7 +242,7 @@ def get_namespace(self): def get_name(self): """Get the name of the NameIdentifier. - Return: + Returns: The name of the NameIdentifier. """ return self.name diff --git a/clients/client-python/gravitino/namespace.py b/clients/client-python/gravitino/namespace.py index f3f0396bb16..281439f1cda 100644 --- a/clients/client-python/gravitino/namespace.py +++ b/clients/client-python/gravitino/namespace.py @@ -11,11 +11,12 @@ class Namespace: "metalake1.catalog1.schema1" are all valid namespaces. """ - EMPTY = None - DOT = "." + _DOT: str = "." + + _levels: List[str] = [] def __init__(self, levels: List[str]): - self.levels = levels + self._levels = levels @staticmethod def empty() -> 'Namespace': @@ -36,14 +37,12 @@ def of(*levels: str) -> 'Namespace': Returns: A namespace with the given levels """ - if levels is None: - raise ValueError("Cannot create a namespace with null levels") + Namespace.check(levels is not None, "Cannot create a namespace with null levels") if len(levels) == 0: return Namespace.empty() for level in levels: - if level is None or level == "": - raise ValueError("Cannot create a namespace with null or empty level") + Namespace.check(level is not None and level != "", "Cannot create a namespace with null or empty level") return Namespace(list(levels)) @@ -90,7 +89,7 @@ def of_table(metalake: str, catalog: str, schema: str) -> 'Namespace': catalog: The catalog name schema: The schema name - Return: + Returns: A namespace for table """ return Namespace.of(metalake, catalog, schema) @@ -104,7 +103,7 @@ def of_fileset(metalake: str, catalog: str, schema: str) -> 'Namespace': catalog: The catalog name schema: The schema name - Return: + Returns: A namespace for fileset """ return Namespace.of(metalake, catalog, schema) @@ -118,7 +117,7 @@ def of_topic(metalake: str, catalog: str, schema: str) -> 'Namespace': catalog: The catalog name schema: The schema name - Return: + Returns: A namespace for topic """ return Namespace.of(metalake, catalog, schema) @@ -131,8 +130,8 @@ def check_metalake(namespace: 'Namespace') -> None: Args: namespace: The metalake namespace """ - if not namespace and not namespace.is_empty(): - raise ValueError(f"Metalake namespace must be non-null and empty, the input namespace is {namespace}") + Namespace.check(namespace is not None and namespace.is_empty(), + f"Metalake namespace must be non-null and empty, the input namespace is {namespace}") @staticmethod def check_catalog(namespace: 'Namespace') -> None: @@ -142,8 +141,8 @@ def check_catalog(namespace: 'Namespace') -> None: Args: namespace: The catalog namespace """ - if not namespace and namespace.length() != 1: - raise ValueError(f"Catalog namespace must be non-null and have 1 level, the input namespace is {namespace}") + Namespace.check(namespace is not None and namespace.length() == 1, + f"Catalog namespace must be non-null and have 1 level, the input namespace is {namespace}") @staticmethod def check_schema(namespace: 'Namespace') -> None: @@ -153,8 +152,8 @@ def check_schema(namespace: 'Namespace') -> None: Args: namespace: The schema namespace """ - if not namespace and namespace.length() != 2: - raise ValueError(f"Schema namespace must be non-null and have 2 levels, the input namespace is {namespace}") + Namespace.check(namespace is not None and namespace.length() == 2, + f"Schema namespace must be non-null and have 2 levels, the input namespace is {namespace}") @staticmethod def check_table(namespace: 'Namespace') -> None: @@ -164,8 +163,8 @@ def check_table(namespace: 'Namespace') -> None: Args: namespace: The table namespace """ - if not namespace and namespace.length() != 3: - raise ValueError(f"Table namespace must be non-null and have 3 levels, the input namespace is {namespace}") + Namespace.check(namespace is not None and namespace.length() == 3, + f"Table namespace must be non-null and have 3 levels, the input namespace is {namespace}") @staticmethod def check_fileset(namespace: 'Namespace') -> None: @@ -175,9 +174,8 @@ def check_fileset(namespace: 'Namespace') -> None: Args: namespace: The fileset namespace """ - if not namespace and namespace.length() != 3: - raise ValueError( - f"Fileset namespace must be non-null and have 3 levels, the input namespace is {namespace}") + Namespace.check(namespace is not None and namespace.length() == 3, + f"Fileset namespace must be non-null and have 3 levels, the input namespace is {namespace}") @staticmethod def check_topic(namespace: 'Namespace') -> None: @@ -187,16 +185,16 @@ def check_topic(namespace: 'Namespace') -> None: Args: namespace: The topic namespace """ - if not namespace and namespace.length() != 3: - raise ValueError(f"Topic namespace must be non-null and have 3 levels, the input namespace is {namespace}") + Namespace.check(namespace is not None and namespace.length() == 3, + f"Topic namespace must be non-null and have 3 levels, the input namespace is {namespace}") def levels(self) -> List[str]: """Get the levels of the namespace. - Return: + Returns: The levels of the namespace """ - return self.levels + return self._levels def level(self, pos: int) -> str: """Get the level at the given position. @@ -204,39 +202,39 @@ def level(self, pos: int) -> str: Args: pos: The position of the level - Return: + Returns: The level at the given position """ - if pos < 0 or pos >= len(self.levels): + if pos < 0 or pos >= len(self._levels): raise ValueError("Invalid level position") - return self.levels[pos] + return self._levels[pos] def length(self) -> int: """Get the length of the namespace. - Return: + Returns: The length of the namespace. """ - return len(self.levels) + return len(self._levels) def is_empty(self) -> bool: """Check if the namespace is empty. - Return: + Returns: True if the namespace is empty, false otherwise. """ - return len(self.levels) == 0 + return len(self._levels) == 0 def __eq__(self, other: 'Namespace') -> bool: if not isinstance(other, Namespace): return False - return self.levels == other.levels + return self._levels == other._levels def __hash__(self) -> int: - return hash(tuple(self.levels)) + return hash(tuple(self._levels)) def __str__(self) -> str: - return Namespace.DOT.join(self.levels) + return Namespace._DOT.join(self._levels) @staticmethod def check(expression: bool, message: str, *args) -> None: diff --git a/clients/client-python/gravitino/rest/rest_message.py b/clients/client-python/gravitino/rest/rest_message.py new file mode 100644 index 00000000000..409b3802cff --- /dev/null +++ b/clients/client-python/gravitino/rest/rest_message.py @@ -0,0 +1,42 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +from abc import ABC, abstractmethod + +from dataclasses_json import DataClassJsonMixin + + +class RESTMessage(DataClassJsonMixin, ABC): + """ + Interface for REST messages. + + REST messages are objects that are sent to and received from REST endpoints. They are + typically used to represent the request and response bodies of REST API calls. + """ + + @abstractmethod + def validate(self): + """ + Ensures that a constructed instance of a REST message is valid according to the REST spec. + + This is needed when parsing data that comes from external sources and the object might have + been constructed without all the required fields present. + + Raises: + IllegalArgumentException: If the message is not valid. + """ + pass + + +class IllegalArgumentException(Exception): + """Exception raised if a REST message is not valid according to the REST spec.""" + pass + + +class RESTRequest(RESTMessage, ABC): + """Interface to mark a REST request.""" + + +class RESTResponse(RESTMessage, ABC): + """Interface to mark a REST response""" diff --git a/clients/client-python/gravitino/utils/exceptions.py b/clients/client-python/gravitino/utils/exceptions.py index 147a1698362..28afc373452 100644 --- a/clients/client-python/gravitino/utils/exceptions.py +++ b/clients/client-python/gravitino/utils/exceptions.py @@ -17,7 +17,7 @@ def __init__(self, error): def json(self): """ - :return: object of response error from the API + :Returns: object of response error from the API """ try: return json.loads(self.body.decode("utf-8")) diff --git a/clients/client-python/tests/integration/__init__.py b/clients/client-python/tests/integration/__init__.py new file mode 100644 index 00000000000..5779a3ad252 --- /dev/null +++ b/clients/client-python/tests/integration/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" diff --git a/clients/client-python/tests/integration_test_env.py b/clients/client-python/tests/integration/integration_test_env.py similarity index 95% rename from clients/client-python/tests/integration_test_env.py rename to clients/client-python/tests/integration/integration_test_env.py index 7036b6f0477..73cbaed5e01 100644 --- a/clients/client-python/tests/integration_test_env.py +++ b/clients/client-python/tests/integration/integration_test_env.py @@ -7,6 +7,7 @@ import unittest import subprocess import time + import requests logger = logging.getLogger(__name__) @@ -19,7 +20,7 @@ def get_gravitino_server_version(): response.close() return True except requests.exceptions.RequestException as e: - logger.error("Failed to access the server: {}", e) + logger.warning("Failed to access the server: {}", e) return False @@ -44,8 +45,8 @@ def _init_logging(): logger.addHandler(console_handler) -# Provide real test environment for the Gravitino Server class IntegrationTestEnv(unittest.TestCase): + """Provide real test environment for the Gravitino Server""" gravitino_startup_script = None @classmethod @@ -77,6 +78,8 @@ def setUpClass(cls): logger.error("ERROR: Can't start Gravitino server!") quit(0) + cls.clean_test_date() + @classmethod def tearDownClass(cls): if os.environ.get('GRADLE_START_GRAVITINO') is not None: diff --git a/clients/client-python/tests/integration/test_fileset_catalog.py b/clients/client-python/tests/integration/test_fileset_catalog.py new file mode 100644 index 00000000000..20d20970392 --- /dev/null +++ b/clients/client-python/tests/integration/test_fileset_catalog.py @@ -0,0 +1,127 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" +import logging + +from gravitino.api.catalog import Catalog +from gravitino.api.fileset import Fileset +from gravitino.api.fileset_change import FilesetChange +from gravitino.client.gravitino_admin_client import GravitinoAdminClient +from gravitino.client.gravitino_client import GravitinoClient +from gravitino.client.gravitino_metalake import GravitinoMetalake +from gravitino.dto.catalog_dto import CatalogDTO +from gravitino.name_identifier import NameIdentifier +from tests.integration.integration_test_env import IntegrationTestEnv + +logger = logging.getLogger(__name__) + + +class TestFilesetCatalog(IntegrationTestEnv): + catalog: Catalog = None + metalake: GravitinoMetalake = None + metalake_name: str = "testMetalake" + catalog_name: str = "testCatalog" + schema_name: str = "testSchema" + fileset_name: str = "testFileset1" + fileset_alter_name: str = "testFilesetAlter" + provider: str = "hadoop" + + metalake_ident: NameIdentifier = NameIdentifier.of(metalake_name) + catalog_ident: NameIdentifier = NameIdentifier.of_catalog(metalake_name, catalog_name) + schema_ident: NameIdentifier = NameIdentifier.of_schema(metalake_name, catalog_name, schema_name) + fileset_ident: NameIdentifier = NameIdentifier.of_fileset(metalake_name, catalog_name, schema_name, fileset_name) + fileset_alter_ident: NameIdentifier = NameIdentifier.of_fileset(metalake_name, catalog_name, schema_name, + fileset_alter_name) + + gravitino_admin_client: GravitinoAdminClient = None + gravitino_client: GravitinoClient = None + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.clean_test_data() + + cls.gravitino_admin_client = GravitinoAdminClient(uri="http://localhost:8090") + cls.metalake = cls.gravitino_admin_client.create_metalake(ident=cls.metalake_ident, + comment="test comment", properties={}) + cls.gravitino_client = GravitinoClient(uri="http://localhost:8090", metalake_name=cls.metalake_name) + + cls.catalog = cls.gravitino_client.create_catalog( + ident=cls.catalog_ident, + type=CatalogDTO.Type.FILESET, + provider=cls.provider, + comment="comment", + properties={"k1": "v1"} + ) + + cls.catalog.as_schemas().create_schema(ident=cls.schema_ident, comment="comment", properties={"k1": "v1"}) + + @classmethod + def tearDownClass(cls): + """Clean test data""" + cls.clean_test_data() + super().tearDownClass() + + @classmethod + def clean_test_data(cls): + try: + cls.gravitino_admin_client = GravitinoAdminClient(uri="http://localhost:8090") + gravitino_metalake = cls.gravitino_admin_client.load_metalake(ident=cls.metalake_ident) + cls.catalog = gravitino_metalake.load_catalog(ident=cls.catalog_ident) + cls.catalog.as_fileset_catalog().drop_fileset(ident=cls.fileset_ident) + cls.catalog.as_fileset_catalog().drop_fileset(ident=cls.fileset_alter_ident) + cls.catalog.as_schemas().drop_schema(ident=cls.schema_ident, cascade=True) + gravitino_metalake.drop_catalog(ident=cls.catalog_ident) + cls.gravitino_admin_client.drop_metalake(cls.metalake_ident) + except Exception as e: + logger.debug(e) + + def create_catalog(self): + self.catalog = self.gravitino_client.create_catalog( + ident=self.catalog_ident, + type=CatalogDTO.Type.FILESET, + provider=self.provider, + comment="comment", + properties={"k1": "v1"}) + + assert self.catalog.name == self.catalog_name + assert self.catalog.type == CatalogDTO.Type.FILESET + assert self.catalog.provider == self.provider + + def create_schema(self): + self.catalog.as_schemas().create_schema( + ident=self.schema_ident, + comment="comment", + properties={"k1": "v1"}) + + def test_create_fileset(self): + fileset = self.catalog.as_fileset_catalog().create_fileset(ident=self.fileset_ident, + type=Fileset.Type.MANAGED, + comment="mock comment", + storage_location="mock location", + properties={"k1": "v1"}) + assert fileset is not None + + fileset_list = self.catalog.as_fileset_catalog().list_filesets(self.fileset_ident.namespace()) + assert fileset_list is not None and len(fileset_list) == 1 + + fileset = self.catalog.as_fileset_catalog().load_fileset(self.fileset_ident) + assert fileset is not None + assert fileset.name() == self.fileset_ident.name() + + # Alter fileset + changes = ( + FilesetChange.rename(self.fileset_alter_name), + FilesetChange.update_comment("new fileset comment"), + FilesetChange.set_property("key1", "value1"), + FilesetChange.remove_property("k1"), + ) + fileset_alter = self.catalog.as_fileset_catalog().alter_fileset(self.fileset_ident, *changes) + assert fileset_alter is not None + assert fileset_alter.name() == self.fileset_alter_name + assert fileset_alter.comment() == "new fileset comment" + assert fileset_alter.properties().get("key1") == "value1" + + # Clean test data + self.catalog.as_fileset_catalog().drop_fileset(ident=self.fileset_ident) diff --git a/clients/client-python/tests/test_gravitino_admin_client.py b/clients/client-python/tests/integration/test_gravitino_admin_client.py similarity index 83% rename from clients/client-python/tests/test_gravitino_admin_client.py rename to clients/client-python/tests/integration/test_gravitino_admin_client.py index 0490e16ab55..e54f4a1ac13 100644 --- a/clients/client-python/tests/test_gravitino_admin_client.py +++ b/clients/client-python/tests/integration/test_gravitino_admin_client.py @@ -2,34 +2,38 @@ Copyright 2024 Datastrato Pvt Ltd. This software is licensed under the Apache License version 2. """ +import logging + import gravitino from gravitino.client.gravitino_admin_client import GravitinoAdminClient from gravitino.dto.dto_converters import DTOConverters from gravitino.dto.requests.metalake_updates_request import MetalakeUpdatesRequest from gravitino.dto.responses.metalake_response import MetalakeResponse -from gravitino.meta_change import MetalakeChange +from gravitino.api.metalake_change import MetalakeChange from gravitino.name_identifier import NameIdentifier from gravitino.utils.exceptions import NotFoundError -from tests.integration_test_env import IntegrationTestEnv +from tests.integration.integration_test_env import IntegrationTestEnv + +logger = logging.getLogger(__name__) -class TestGravitinoClient(IntegrationTestEnv): +class TestGravitinoAdminClient(IntegrationTestEnv): def setUp(self): self._gravitino_admin_client = GravitinoAdminClient(uri="http://localhost:8090") def test_create_metalake(self): metalake_name = "metalake00" - try: - self.create_metalake(metalake_name) - except gravitino.utils.exceptions.HTTPError: - self.drop_metalake(metalake_name) # Clean test data self.drop_metalake(metalake_name) + self.create_metalake(metalake_name) + # Clean test data + self.drop_metalake(metalake_name) + def create_metalake(self, metalake_name): - comment = "This is a sample comment" ident = NameIdentifier.of(metalake_name) + comment = "This is a sample comment" properties = {"key1": "value1", "key2": "value2"} gravitinoMetalake = self._gravitino_admin_client.create_metalake(ident, comment, properties) @@ -37,16 +41,17 @@ def create_metalake(self, metalake_name): self.assertEqual(gravitinoMetalake.name, metalake_name) self.assertEqual(gravitinoMetalake.comment, comment) self.assertEqual(gravitinoMetalake.properties.get("key1"), "value1") - self.assertEqual(gravitinoMetalake.audit.creator, "anonymous") + self.assertEqual(gravitinoMetalake.audit._creator, "anonymous") def test_alter_metalake(self): metalake_name = "metalake02" metalake_new_name = metalake_name + "_new" - try: - self.create_metalake(metalake_name) - except gravitino.utils.exceptions.HTTPError: - self.drop_metalake(metalake_name) + # Clean test data + self.drop_metalake(metalake_name) + self.drop_metalake(metalake_new_name) + + self.create_metalake(metalake_name) changes = ( MetalakeChange.rename(metalake_new_name), MetalakeChange.update_comment("new metalake comment"), @@ -55,7 +60,7 @@ def test_alter_metalake(self): metalake = self._gravitino_admin_client.alter_metalake(NameIdentifier.of(metalake_name), *changes) self.assertEqual(metalake_new_name, metalake.name) self.assertEqual("new metalake comment", metalake.comment) - self.assertEqual("anonymous", metalake.audit.creator) # Assuming a constant or similar attribute + self.assertEqual("anonymous", metalake.audit._creator) # Assuming a constant or similar attribute # Reload metadata via new name to check if the changes are applied new_metalake = self._gravitino_admin_client.load_metalake(NameIdentifier.of(metalake_new_name)) @@ -67,13 +72,9 @@ def test_alter_metalake(self): with self.assertRaises(NotFoundError): # TODO: NoSuchMetalakeException self._gravitino_admin_client.load_metalake(old) - # Clean test data - self.drop_metalake(metalake_name) - self.drop_metalake(metalake_new_name) - - def drop_metalake(self, metalake_name): + def drop_metalake(self, metalake_name) -> bool: ident = NameIdentifier.of(metalake_name) - self.assertTrue(self._gravitino_admin_client.drop_metalake(ident)) + return self._gravitino_admin_client.drop_metalake(ident) def test_drop_metalake(self): metalake_name = "metalake03" @@ -82,7 +83,7 @@ def test_drop_metalake(self): except gravitino.utils.exceptions.HTTPError: self.drop_metalake(metalake_name) - self.drop_metalake(metalake_name) + assert self.drop_metalake(metalake_name) == True def test_metalake_update_request_to_json(self): changes = ( @@ -103,7 +104,7 @@ def test_from_json_metalake_response(self): self.assertEqual(metalake_response.code, 0) self.assertIsNotNone(metalake_response.metalake) self.assertEqual(metalake_response.metalake.name, "example_name18") - self.assertEqual(metalake_response.metalake.audit.creator, "anonymous") + self.assertEqual(metalake_response.metalake.audit._creator, "anonymous") def test_list_metalakes(self): metalake_name = "metalake05" @@ -113,4 +114,3 @@ def test_list_metalakes(self): # Clean test data self.drop_metalake(metalake_name) -