Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration Manager Redesign #1272

Merged
merged 26 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions evadb/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from evadb.catalog.catalog_type import ColumnType, TableType
from evadb.catalog.catalog_utils import get_metadata_properties, is_document_table
from evadb.configuration.constants import EvaDB_INSTALLATION_DIR
from evadb.executor.execution_context import Context
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
Expand Down Expand Up @@ -264,6 +265,11 @@ def _bind_tuple_expr(self, node: TupleValueExpression):

@bind.register(FunctionExpression)
def _bind_func_expr(self, node: FunctionExpression):
# setup the context
# we read the GPUs from the catalog and populate in the context
gpus_ids = self._catalog().get_configuration_catalog_value("gpu_ids")
node._context = Context(gpus_ids)

# handle the special case of "extract_object"
if node.name.upper() == str(FunctionType.EXTRACT_OBJECT):
handle_bind_extract_object_function(node, self)
Expand Down Expand Up @@ -331,9 +337,17 @@ def _bind_func_expr(self, node: FunctionExpression):
)
# certain functions take additional inputs like yolo needs the model_name
# these arguments are passed by the user as part of metadata
node.function = lambda: function_class(
**get_metadata_properties(function_obj)
)
# we also handle the special case of ChatGPT where we need to send the
# OpenAPI key as part of the parameter
# ToDO: this should be better handled
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think have a CREATE call like the following is the right approach?

CREATE FUNCTION myllm
TYPE llm
MODEL GPT4
OPENAI key;

If so, we can mention in the TODO comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should already work. Will handle it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

properties = get_metadata_properties(function_obj)
if node.name.upper() == str("CHATGPT"):
gaurav274 marked this conversation as resolved.
Show resolved Hide resolved
openapi_key = self._catalog().get_configuration_catalog_value(
"OPENAI_KEY"
)
properties["api_key"] = openapi_key

node.function = lambda: function_class(**properties)
except Exception as e:
err_msg = (
f"{str(e)}. Please verify that the function class name in the "
Expand Down
69 changes: 48 additions & 21 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
import shutil
from pathlib import Path
from typing import List
from typing import Any, List

from evadb.catalog.catalog_type import (
ColumnType,
Expand All @@ -23,7 +23,6 @@
VideoColumnName,
)
from evadb.catalog.catalog_utils import (
cleanup_storage,
construct_function_cache_catalog_entry,
get_document_table_column_definitions,
get_image_table_column_definitions,
Expand All @@ -33,6 +32,7 @@
)
from evadb.catalog.models.utils import (
ColumnCatalogEntry,
ConfigurationCatalogEntry,
DatabaseCatalogEntry,
FunctionCacheCatalogEntry,
FunctionCatalogEntry,
Expand All @@ -46,6 +46,9 @@
truncate_catalog_tables,
)
from evadb.catalog.services.column_catalog_service import ColumnCatalogService
from evadb.catalog.services.configuration_catalog_service import (
ConfigurationCatalogService,
)
from evadb.catalog.services.database_catalog_service import DatabaseCatalogService
from evadb.catalog.services.function_cache_catalog_service import (
FunctionCacheCatalogService,
Expand All @@ -61,23 +64,28 @@
from evadb.catalog.services.index_catalog_service import IndexCatalogService
from evadb.catalog.services.table_catalog_service import TableCatalogService
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
from evadb.configuration.configuration_manager import ConfigurationManager
from evadb.expression.function_expression import FunctionExpression
from evadb.parser.create_statement import ColumnDefinition
from evadb.parser.table_ref import TableInfo
from evadb.parser.types import FileFormatType
from evadb.third_party.databases.interface import get_database_handler
from evadb.utils.generic_utils import generate_file_path, get_file_checksum
from evadb.utils.generic_utils import (
generate_file_path,
get_file_checksum,
remove_directory_contents,
)
from evadb.utils.logging_manager import logger


class CatalogManager(object):
def __init__(self, db_uri: str, config: ConfigurationManager):
def __init__(self, db_uri: str):
self._db_uri = db_uri
self._sql_config = SQLConfig(db_uri)
self._config = config
self._bootstrap_catalog()
self._db_catalog_service = DatabaseCatalogService(self._sql_config.session)
self._config_catalog_service = ConfigurationCatalogService(
self._sql_config.session
)
self._table_catalog_service = TableCatalogService(self._sql_config.session)
self._column_service = ColumnCatalogService(self._sql_config.session)
self._function_service = FunctionCatalogService(self._sql_config.session)
Expand Down Expand Up @@ -130,10 +138,15 @@ def _clear_catalog_contents(self):
logger.info("Clearing catalog")
# drop tables which are not part of catalog
drop_all_tables_except_catalog(self._sql_config.engine)
# truncate the catalog tables
truncate_catalog_tables(self._sql_config.engine)
# truncate the catalog tables except configuration_catalog
# We do not remove the configuration entries
truncate_catalog_tables(
self._sql_config.engine, tables_not_to_truncate=["configuration_catalog"]
)
# clean up the dataset, index, and cache directories
cleanup_storage(self._config)
for folder in ["cache_dir", "index_dir", "datasets_dir"]:
print(folder, self.get_configuration_catalog_value(folder))
remove_directory_contents(self.get_configuration_catalog_value(folder))

"Database catalog services"

Expand Down Expand Up @@ -445,7 +458,7 @@ def get_all_index_catalog_entries(self):
""" Function Cache related"""

def insert_function_cache_catalog_entry(self, func_expr: FunctionExpression):
cache_dir = self._config.get_value("storage", "cache_dir")
cache_dir = self.get_configuration_catalog_value("cache_dir")
entry = construct_function_cache_catalog_entry(func_expr, cache_dir=cache_dir)
return self._function_cache_service.insert_entry(entry)

Expand Down Expand Up @@ -508,7 +521,7 @@ def create_and_insert_table_catalog_entry(
table_name = table_info.table_name
column_catalog_entries = xform_column_definitions_to_catalog_entries(columns)

dataset_location = self._config.get_value("core", "datasets_dir")
dataset_location = self.get_configuration_catalog_value("datasets_dir")
file_url = str(generate_file_path(dataset_location, table_name))
table_catalog_entry = self.insert_table_catalog_entry(
table_name,
Expand Down Expand Up @@ -608,14 +621,28 @@ def create_and_insert_multimedia_metadata_table_catalog_entry(
)
return obj

"Configuration catalog services"

def upsert_configuration_catalog_entry(self, key: str, value: any):
"""Upserts configuration catalog entry"

Args:
key: key name
value: value name
"""
self._config_catalog_service.upsert_entry(key, value)

def get_configuration_catalog_value(self, key: str) -> Any:
gaurav274 marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns the value entry for the given key
Arguments:
key (str): key name

Returns:
ConfigurationCatalogEntry
"""

#### get catalog instance
# This function plays a crucial role in ensuring that different threads do
# not share the same catalog object, as it can result in serialization issues and
# incorrect behavior with SQLAlchemy. Therefore, whenever a catalog instance is
# required, we create a new one. One possible optimization is to share the catalog
# instance across all objects within the same thread. It is worth investigating whether
# SQLAlchemy already handles this optimization for us, which will be explored at a
# later time.
def get_catalog_instance(db_uri: str, config: ConfigurationManager):
return CatalogManager(db_uri, config)
table_entry = self._config_catalog_service.get_entry_by_name(key)
if table_entry:
return table_entry.value
return None
26 changes: 16 additions & 10 deletions evadb/catalog/catalog_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@
TableCatalogEntry,
)
from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.configuration.configuration_manager import ConfigurationManager
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
from evadb.parser.create_statement import ColConstraintInfo, ColumnDefinition
from evadb.utils.generic_utils import get_str_hash, remove_directory_contents
from evadb.utils.generic_utils import get_str_hash


def is_video_table(table: TableCatalogEntry):
Expand Down Expand Up @@ -256,12 +255,6 @@ def construct_function_cache_catalog_entry(
return entry


def cleanup_storage(config):
remove_directory_contents(config.get_value("storage", "index_dir"))
remove_directory_contents(config.get_value("storage", "cache_dir"))
remove_directory_contents(config.get_value("core", "datasets_dir"))


def get_metadata_entry_or_val(
function_obj: FunctionCatalogEntry, key: str, default_val: Any = None
) -> str:
Expand Down Expand Up @@ -300,6 +293,19 @@ def get_metadata_properties(function_obj: FunctionCatalogEntry) -> Dict:
return properties


def bootstrap_configs(catalog: "CatalogManager", configs: dict):
"""
load all the configuration values into the catalog table configuration_catalog
"""
for key, value in configs.items():
catalog.upsert_configuration_catalog_entry(key, value)


def get_configuration_value(key: str):
catalog = get_catalog_instance()
return catalog.get_configuration_catalog_value(key)


#### get catalog instance
# This function plays a crucial role in ensuring that different threads do
# not share the same catalog object, as it can result in serialization issues and
Expand All @@ -308,7 +314,7 @@ def get_metadata_properties(function_obj: FunctionCatalogEntry) -> Dict:
# instance across all objects within the same thread. It is worth investigating whether
# SQLAlchemy already handles this optimization for us, which will be explored at a
# later time.
def get_catalog_instance(db_uri: str, config: ConfigurationManager):
def get_catalog_instance(db_uri: str):
from evadb.catalog.catalog_manager import CatalogManager

return CatalogManager(db_uri, config)
return CatalogManager(db_uri)
37 changes: 0 additions & 37 deletions evadb/catalog/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib

import sqlalchemy
from sqlalchemy import Column, Integer
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy_utils import database_exists

from evadb.catalog.sql_config import CATALOG_TABLES
from evadb.utils.logging_manager import logger


Expand Down Expand Up @@ -100,33 +93,3 @@ def _commit(self, db_session):

# Custom Base Model to be inherited by all models
BaseModel = declarative_base(cls=CustomModel, constructor=None)


def truncate_catalog_tables(engine: Engine):
"""Truncate all the catalog tables"""
# https://stackoverflow.com/questions/4763472/sqlalchemy-clear-database-content-but-dont-drop-the-schema/5003705#5003705 #noqa
# reflect to refresh the metadata
BaseModel.metadata.reflect(bind=engine)
insp = sqlalchemy.inspect(engine)
if database_exists(engine.url):
with contextlib.closing(engine.connect()) as con:
trans = con.begin()
for table in reversed(BaseModel.metadata.sorted_tables):
if insp.has_table(table.name):
con.execute(table.delete())
trans.commit()


def drop_all_tables_except_catalog(engine: Engine):
"""drop all the tables except the catalog"""
# reflect to refresh the metadata
BaseModel.metadata.reflect(bind=engine)
insp = sqlalchemy.inspect(engine)
if database_exists(engine.url):
with contextlib.closing(engine.connect()) as con:
trans = con.begin()
for table in reversed(BaseModel.metadata.sorted_tables):
if table.name not in CATALOG_TABLES:
if insp.has_table(table.name):
table.drop(con)
trans.commit()
45 changes: 45 additions & 0 deletions evadb/catalog/models/configuration_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sqlalchemy import Column, String

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import ConfigurationCatalogEntry, TextPickleType


class ConfigurationCatalog(BaseModel):
"""The `ConfigurationCatalog` catalog stores all the configuration params.
`_row_id:` an autogenerated unique identifier.
`_key:` the key for the config.
`_value:` the value for the config
"""

__tablename__ = "configuration_catalog"

_key = Column("name", String(100), unique=True)
_value = Column(
"engine", TextPickleType()
) # TODO :- Will this work or would we need to pickle ??
hershd23 marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, key: str, value: any):
self._key = key
self._value = value

def as_dataclass(self) -> "ConfigurationCatalogEntry":
return ConfigurationCatalogEntry(
row_id=self._row_id,
key=self._key,
value=self._value,
)
24 changes: 21 additions & 3 deletions evadb/catalog/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def init_db(engine: Engine):
BaseModel.metadata.create_all(bind=engine)


def truncate_catalog_tables(engine: Engine):
def truncate_catalog_tables(engine: Engine, tables_not_to_truncate: List[str] = []):
"""Truncate all the catalog tables"""
# https://stackoverflow.com/questions/4763472/sqlalchemy-clear-database-content-but-dont-drop-the-schema/5003705#5003705 #noqa
# reflect to refresh the metadata
Expand All @@ -71,8 +71,9 @@ def truncate_catalog_tables(engine: Engine):
with contextlib.closing(engine.connect()) as con:
trans = con.begin()
for table in reversed(BaseModel.metadata.sorted_tables):
if insp.has_table(table.name):
con.execute(table.delete())
if table.name not in tables_not_to_truncate:
if insp.has_table(table.name):
con.execute(table.delete())
trans.commit()


Expand Down Expand Up @@ -257,3 +258,20 @@ def display_format(self):
"engine": self.engine,
"params": self.params,
}


@dataclass(unsafe_hash=True)
class ConfigurationCatalogEntry:
"""Dataclass representing an entry in the `ConfigurationCatalog`.
This is done to ensure we don't expose the sqlalchemy dependencies beyond catalog service. Further, sqlalchemy does not allow sharing of objects across threads.
"""

key: str
value: str
row_id: int = None

def display_format(self):
return {
"key": self.key,
"value": self.value,
}
Loading