Skip to content
This repository has been archived by the owner on Feb 28, 2024. It is now read-only.

feat(db): use duckdb backend #509

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1fce242
add objects
jonburdo Sep 28, 2022
678e6b1
add file db
jonburdo Sep 28, 2022
7676a52
fix db code formatting
jonburdo Sep 28, 2022
e7b6da9
start duckdb
jonburdo Sep 28, 2022
ae4e363
use file db in query application
jonburdo Sep 28, 2022
bdc8ed7
duckdb backend for data_object_meta and annotation
jonburdo Sep 29, 2022
beeec38
add LDBClient and duckdb operations for most of the indexing logic
jonburdo Sep 30, 2022
b7d60b3
add dataset insert
jonburdo Oct 1, 2022
7d3d94d
use JSON duckdb fields and data object annotation pair queries
jonburdo Oct 1, 2022
c9aab54
get list working with ds:root
jonburdo Oct 2, 2022
7f41897
fix query options and eval command
jonburdo Oct 3, 2022
1ebca1b
make FileDB api compatible with DuckDB api, allow choice of backend i…
jonburdo Oct 3, 2022
588e1d0
allow choosing database backend with init -d option or LDB_DATABASE e…
jonburdo Oct 3, 2022
8968c33
add relevant methods to AbstractDB
jonburdo Oct 5, 2022
54a35e9
remove duplicate functions, fix data_object_annotation insert bug
jonburdo Oct 5, 2022
d61fe51
fix function signatures and FileDB add functions
jonburdo Oct 5, 2022
4f80a8c
adjust indexing code to use new api, fix db functions
jonburdo Oct 5, 2022
51a8ad9
fix ls_collection signature and cast generator to dict
jonburdo Oct 5, 2022
f685303
fix FileDB serialization functions
jonburdo Oct 6, 2022
47c2842
fix dataset_set typo and flake8 unused import
jonburdo Oct 6, 2022
6842c69
fix type annotations
jonburdo Oct 6, 2022
5ee0dfe
add 'id' to allowed-redefined-builtins in pylint config
jonburdo Oct 6, 2022
7cfe22f
add import-outside-toplevel to global pylint disable list
jonburdo Oct 6, 2022
798bc65
fix pylint
jonburdo Oct 6, 2022
12dfcaa
fix pylint encoding issue
jonburdo Oct 6, 2022
46cea4c
run pylint by itself
jonburdo Oct 6, 2022
e3be673
fix duckdb import
jonburdo Oct 6, 2022
7b6609f
fix pylint similar lines
jonburdo Oct 6, 2022
aff5eaf
fix malformed JSON bug
jonburdo Oct 6, 2022
b9f558f
fix db annotation store and minor fixes
jonburdo Oct 10, 2022
22af1bc
add missing abstract methods and have AbstractDB inherit from ABC
jonburdo Oct 11, 2022
3cfb0e2
move functions used by duckdb only from AbstractDB to DuckDB class
jonburdo Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ jobs:
- name: run linters
run: |
[[ "$RUNNER_OS" != "Windows" ]] && source ./venv/bin/activate || source ./venv/Scripts/activate
SKIP=no-commit-to-branch pre-commit run -a --show-diff-on-failure
SKIP=no-commit-to-branch,pylint pre-commit run -a --show-diff-on-failure
pylint ldb tests scripts stubs

test:
if: ${{ github.actor != 'dependabot[bot]' && github.actor != 'renovate[bot]' }}
Expand Down
14 changes: 13 additions & 1 deletion ldb/command/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@


def init_command(options: Namespace) -> None:
init(options.path, force=options.force, read_any_cloud_location=False)
init(
options.path,
force=options.force,
read_any_cloud_location=False,
db_type=options.database,
)
set_default_instance(options.path, overwrite_existing=False)


Expand All @@ -34,6 +39,13 @@ def add_parser(
default=False,
help="Overwrite an existing instance",
)
parser.add_argument(
"-d",
"--database",
default="file",
choices=("file", "duckdb"),
help="Database backend to use",
)
parser.add_argument( # type: ignore[attr-defined]
"path",
metavar="<path>",
Expand Down
68 changes: 61 additions & 7 deletions ldb/core.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,81 @@
import os
import os.path as osp
import shlex
import shutil
from pathlib import Path
from typing import Optional
from typing import Optional, Tuple, Type, Union

from funcy.objects import cached_property

from ldb import config
from ldb.config import get_default_instance_dir, get_global_base, get_ldb_dir
from ldb.db.abstract import AbstractDB
from ldb.db.duckdb import DuckDB
from ldb.db.file import FileDB
from ldb.exceptions import LDBException, LDBInstanceNotFoundError
from ldb.path import INSTANCE_DIRS, REQUIRED_INSTANCE_DIRS, Filename, GlobalDir
from ldb.path import REQUIRED_INSTANCE_DIRS, Filename, GlobalDir
from ldb.storage import StorageLocation, add_storage


class LDBClient:
def __init__(self, ldb_dir: Union[str, Path], db_type: str = ""):
self.ldb_dir = os.fspath(ldb_dir)
self._db_type = db_type

@cached_property
def db_info(self) -> Tuple[str, str, Type[AbstractDB]]:
duckdb_path = osp.join(self.ldb_dir, "duckdb", "index.db")
if not self._db_type:
if osp.isfile(duckdb_path):
self._db_type = "duckdb"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is a more general question, but do we want to have a versioning system for the database schema? Especially if we are auto-detecting it (and might assume it is usable without being sure of the database's table structure). (This is also something we could look into in a future version, for sure.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good idea. I'm not sure right off how to implement it. I assume we'd store the version in the db and maybe have different DuckDB classes/modules for different versions? I added an issue here: #511
Feel free to edit the description

else:
self._db_type = "file"
if self._db_type == "duckdb":
return self._db_type, duckdb_path, DuckDB
if self._db_type == "file":
return self._db_type, self.ldb_dir, FileDB
raise ValueError(f"Invalid db type: {self._db_type}")

@property
def db_type(self) -> str:
return self.db_info[0]

@property
def db_path(self) -> str:
return self.db_info[1]

@cached_property
def db(self) -> AbstractDB:
cls: Type[AbstractDB]
_, path, cls = self.db_info
return cls(path)


def init(
path: Path,
force: bool = False,
read_any_cloud_location: bool = False,
auto_index: bool = False,
db_type: str = "",
) -> Path:
"""
Create a new LDB instance.
"""
path = Path(os.path.normpath(path))
if not db_type:
db_type = os.getenv("LDB_DATABASE", "")
path = Path(os.path.abspath(path))
if path.is_dir() and next(path.iterdir(), None) is not None:
if is_ldb_instance(path):
if force:
print(f"Removing existing LDB instance at {repr(os.fspath(path))}")
shutil.rmtree(path)
with os.scandir(path) as scandir_it:
entries = list(scandir_it)
for entry in entries:
entry_path = osp.join(path, entry)
if entry.is_dir():
shutil.rmtree(entry_path)
else:
os.unlink(entry_path)
else:
raise LDBException(
"Initialization failed\n"
Expand All @@ -38,8 +88,10 @@ def init(
f"Directory not empty: {repr(os.fspath(path))}\n"
"To create an LDB instance here, remove directory contents",
)
for subdir in INSTANCE_DIRS:
(path / subdir).mkdir(parents=True)
client = LDBClient(path, db_type=db_type)
os.makedirs(osp.dirname(client.db_path), exist_ok=True)
client.db.init()

with config.edit(path / Filename.CONFIG) as cfg:
cfg["core"] = {
"read_any_cloud_location": read_any_cloud_location,
Expand Down Expand Up @@ -95,7 +147,9 @@ def add_public_data_lakes(ldb_dir: Path) -> None:


def is_ldb_instance(path: Path) -> bool:
return all((path / subdir).is_dir() for subdir in REQUIRED_INSTANCE_DIRS)
return osp.isfile(osp.join(path, "duckdb", "index.db")) or all(
(path / subdir).is_dir() for subdir in REQUIRED_INSTANCE_DIRS
)


def get_ldb_instance(path: Optional[Path] = None) -> Path:
Expand Down
75 changes: 61 additions & 14 deletions ldb/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from itertools import tee
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Collection,
Expand All @@ -17,6 +18,7 @@
Iterable,
Iterator,
List,
Mapping,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -51,6 +53,10 @@
parse_datetime,
)

if TYPE_CHECKING:
from ldb.db.abstract import AbstractDB
from ldb.index.utils import DataObjectMeta as DataObjectMetaT

OpDef = Tuple[str, Union[str, int, float, List[str]]]
CollectionFunc = Callable[
[Iterable[Tuple[str, str]]],
Expand Down Expand Up @@ -189,7 +195,9 @@ def get_collection_from_dataset_identifier(
dataset_version: Optional[int] = None,
) -> Dict[str, Optional[str]]:
if dataset_name == ROOT:
return get_root_collection(ldb_dir)
from ldb.core import LDBClient

return dict(LDBClient(ldb_dir).db.get_root_collection())
dataset = get_dataset(ldb_dir, dataset_name)
dataset_version_hash = get_dataset_version_hash(dataset, dataset_version)
return get_collection(ldb_dir, dataset_version_hash)
Expand Down Expand Up @@ -258,6 +266,11 @@ def combine_collections(
ldb_dir: Path,
collections: List[Dict[str, Optional[str]]],
) -> Dict[str, str]:
if not collections:
return {}
if len(collections) == 1:
return {k: v if v is not None else "" for k, v in collections[0].items()}

all_versions: DefaultDict[str, List[str]] = defaultdict(list)
for collection in collections:
for data_object_hash, annotation_hash in collection.items():
Expand All @@ -267,6 +280,8 @@ def combine_collections(
combined_collection = {}
for data_object_hash, annotation_hashes in sorted(all_versions.items()):
if len(annotation_hashes) > 1:
# TODO get_latest_annotation_version func to handle this
# get latest annotation (most recent unique version)
annotation_dir = (
get_hash_path(
ldb_dir / InstanceDir.DATA_OBJECT_INFO,
Expand Down Expand Up @@ -520,7 +535,7 @@ class Query(CollectionOperation):
def __init__(
self,
ldb_dir: Path,
cache: LDBMappingCache[str, Any],
cache: Mapping[str, Any],
search: BoolSearchFunc,
) -> None:
self.ldb_dir = ldb_dir
Expand Down Expand Up @@ -552,7 +567,10 @@ def get_search_input(
collection: Iterable[Tuple[str, str]],
) -> Iterator[JSONDecoded]:
for _, annot_hash in collection:
yield self.cache[annot_hash]
if not annot_hash:
yield None
else:
yield self.cache[annot_hash]


class FileQuery(Query):
Expand Down Expand Up @@ -583,16 +601,23 @@ def get_search_input(


class PipelineData:
def __init__(self, ldb_dir: Path) -> None:
self.ldb_dir = ldb_dir
def __init__(
self,
db: "AbstractDB",
data_object_ids: Iterable[str],
annotation_ids: Iterable[str],
) -> None:
self.db = db
self.data_object_ids = data_object_ids
self.annotation_ids = annotation_ids

@cached_property
def data_object_metas(self) -> DataObjectMetaCache:
return DataObjectMetaCache(self.ldb_dir)
def data_object_metas(self) -> Dict[str, "DataObjectMetaT"]:
return dict(self.db.get_data_object_meta_many(self.data_object_ids))

@cached_property
def annotations(self) -> AnnotationCache:
return AnnotationCache(self.ldb_dir)
def annotations(self) -> Dict[str, JSONDecoded]:
return dict(self.db.get_annotation_many(self.annotation_ids))


class PipelineBuilder:
Expand All @@ -603,9 +628,8 @@ def __init__(
) -> None:
self.ldb_dir = ldb_dir
if data is None:
self.data = PipelineData(ldb_dir)
else:
self.data = data
raise ValueError("data cannot be None")
self.data: PipelineData = data

def build(
self,
Expand Down Expand Up @@ -720,19 +744,42 @@ def apply_queries(
data_object_hashes: Iterable[str],
annotation_hashes: Iterable[str],
op_defs: Iterable[OpDef],
data: Optional[PipelineData] = None,
warn: bool = True,
) -> Iterator[Tuple[str, str]]:
collection = zip(data_object_hashes, annotation_hashes)
return apply_queries_to_collection(ldb_dir, collection, op_defs, warn=warn)
return apply_queries_to_collection(
ldb_dir,
collection,
op_defs,
data=data,
warn=warn,
)


def apply_queries_to_collection(
ldb_dir: Path,
collection: Iterable[Tuple[str, str]],
op_defs: Iterable[OpDef],
data: Optional[PipelineData] = None,
warn: bool = True,
) -> Iterator[Tuple[str, str]]:
"""
Filter the given collection by the operations in `collection_ops`.
"""
return Pipeline.from_defs(ldb_dir, op_defs, warn=warn).run(collection)
from ldb.core import LDBClient

collection = list(collection)
if collection:
data_object_ids, annotation_ids = zip(*collection)
else:
data_object_ids, annotation_ids = (), ()
if data is None:
data = PipelineData(
LDBClient(ldb_dir).db,
data_object_ids,
annotation_ids,
)
return Pipeline.from_defs(ldb_dir, op_defs, data=data, warn=warn).run(
collection,
)
Loading