Skip to content

Commit

Permalink
Refactor dataset diff and compute metric (#381)
Browse files Browse the repository at this point in the history
  • Loading branch information
changhiskhan authored Dec 18, 2022
1 parent 8997527 commit 0efbd0a
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 25 deletions.
4 changes: 4 additions & 0 deletions cpp/include/lance/arrow/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ class LanceDataset : public ::arrow::dataset::Dataset {
static ::arrow::Result<std::shared_ptr<LanceDataset>> Make(
const std::string& uri, std::optional<uint64_t> version = std::nullopt);

/// Create a new LanceDataset at a given version
::arrow::Result<std::shared_ptr<LanceDataset>> Checkout(
std::optional<uint64_t> version = std::nullopt) const;

/// Get all the dataset versions.
::arrow::Result<std::vector<DatasetVersion>> versions() const;

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/lance/arrow/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ ::arrow::Result<std::shared_ptr<LanceDataset>> LanceDataset::Make(
return std::shared_ptr<LanceDataset>(new LanceDataset(std::move(impl)));
}

::arrow::Result<std::shared_ptr<LanceDataset>> LanceDataset::Checkout(std::optional<uint64_t> version) const {
return LanceDataset::Make(impl_->fs, impl_->base_uri, version);
}

::arrow::Result<std::vector<DatasetVersion>> LanceDataset::versions() const {
std::vector<DatasetVersion> versions;
::arrow::fs::FileSelector selector;
Expand Down
34 changes: 31 additions & 3 deletions python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
import os
from datetime import datetime
from pathlib import Path
from typing import Optional, Union
from typing import Optional, Union, Callable

import pandas
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.fs
from pyarrow._dataset import Dataset

from . import version

Expand All @@ -35,9 +36,9 @@
)
from lance.types import register_extension_types

__all__ = ["dataset", "write_dataset", "LanceFileFormat", "__version__"]
__all__ = ["dataset", "write_dataset", "LanceFileFormat", "__version__", "diff", "compute_metric"]

from .util.versioning import get_version_asof
from .util.versioning import get_version_asof, LanceDiff, compute_metric

register_extension_types()

Expand Down Expand Up @@ -90,6 +91,33 @@ def dataset(
return _get_versioned_dataset(filesystem, uri, version)


def diff(dataset: FileSystemDataset, v1: int, v2: int = None) -> LanceDiff:
"""
Get the difference from v1 to v2 of this dataset
Parameters
----------
dataset: FileSystemDataset
The dataset we want to get the diff for
v1: int
Start version. If negative then it is assumed to be reverse from latest.
So -1 would mean the second to the latest version
v2: int, default None
End version. If not specified, use the current version in the given
dataset
"""
if v1 < 0:
v1 = dataset.versions()[v1]["version"]
if v2 is None:
v2 = dataset.version["version"]
if v2 < 0:
v2 = dataset.versions()[v2]["version"]
if v1 > v2:
raise ValueError("v2 must not be less than v1")
return LanceDiff(dataset.checkout(v1),
dataset if v2 is None else dataset.checkout(v2))


def _is_plain_dataset(filesystem: pa.fs.FileSystem, uri: str):
manifest = os.path.join(uri, "_latest.manifest")
return filesystem.get_file_info(manifest).type == pa.fs.FileType.NotFound
Expand Down
23 changes: 23 additions & 0 deletions python/lance/_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Dict, List, Optional, Union

import pandas as pd
import pyarrow

from cython.operator cimport dereference as deref
from lance.util.versioning import LanceDiff, compute_metric
from libc.stdint cimport int64_t, uint64_t
from libc.time cimport time_t
from libcpp cimport bool
Expand Down Expand Up @@ -227,6 +229,8 @@ cdef extern from "lance/arrow/dataset.h" namespace "lance::arrow" nogil:
optional[uint64_t] version,
)

CResult[shared_ptr[CLanceDataset]] Checkout(optional[uint64_t] version)

CDatasetVersion version() const;

CResult[CDatasetVersion] latest_version() const;
Expand Down Expand Up @@ -263,6 +267,23 @@ cdef class FileSystemDataset(Dataset):
Dataset.init(self, sp)
self.lance_dataset = <CLanceDataset *> sp.get()

def checkout(self, version: Optional[int]) -> FileSystemDataset:
if version is None:
version = 0
return self._checkout(version)

def _checkout(self, uint64_t version) -> FileSystemDataset:
cdef:
shared_ptr[CLanceDataset] c_dataset
shared_ptr[CDataset] c_base_dataset
optional[uint64_t] c_version

c_version = optional[uint64_t](version)

c_dataset = GetResultValue(self.lance_dataset.Checkout(c_version))
c_base_dataset = static_pointer_cast[CDataset, CLanceDataset](c_dataset)
return FileSystemDataset.wrap(move(c_base_dataset))

@property
def uri(self) -> str:
return self.lance_dataset.uri().decode("UTF-8")
Expand Down Expand Up @@ -363,6 +384,7 @@ cdef class FileSystemDataset(Dataset):
)
return FileSystemDataset.wrap(static_pointer_cast[CDataset, CLanceDataset](dataset))


def _lance_dataset_write(
Dataset data,
object base_dir not None,
Expand Down Expand Up @@ -441,3 +463,4 @@ def _lance_dataset_make(
c_dataset = GetResultValue(CLanceDataset.Make(c_filesystem, base_dir, c_version))
c_base_dataset = static_pointer_cast[CDataset, CLanceDataset](c_dataset)
return FileSystemDataset.wrap(move(c_base_dataset))

14 changes: 6 additions & 8 deletions python/lance/tests/util/test_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
ColumnDiff,
LanceDiff,
RowDiff,
compute_metric,
diff,
get_version_asof,
)

Expand Down Expand Up @@ -65,12 +63,12 @@ def _get_test_timestamps(naive):

def test_compute_metric(tmp_path):
base_dir = tmp_path / "test"
_create_dataset(base_dir)
ds = _create_dataset(base_dir)

def func(dataset):
return dataset.to_table().to_pandas().max().to_frame().T

metrics = compute_metric(base_dir, func)
metrics = lance.compute_metric(ds, func)
assert "version" in metrics


Expand All @@ -80,14 +78,14 @@ def _create_dataset(base_dir):
table2 = pa.Table.from_pylist([{"a": 100, "b": 200}])
lance.write_dataset(table2, base_dir, mode="append")
table3 = pa.Table.from_pylist([{"a": 100, "c": 100, "d": 200}])
lance.dataset(base_dir).merge(table3, left_on="a", right_on="a")
return lance.dataset(base_dir).merge(table3, left_on="a", right_on="a")


def test_diff(tmp_path):
base_dir = tmp_path / "test"
_create_dataset(base_dir)
ds = _create_dataset(base_dir)

d = diff(base_dir, 1, 2)
d = lance.diff(ds, 1, 2)
assert isinstance(d, LanceDiff)

rows = d.rows_added(key="a")
Expand All @@ -97,7 +95,7 @@ def test_diff(tmp_path):
assert isinstance(tbl, pa.Table)
assert len(tbl) == 1

cols = d.columns_added()
cols = lance.diff(ds, 2, 3).columns_added()
assert isinstance(cols, ColumnDiff)
assert len(cols.schema) == 2
tbl = cols.head()
Expand Down
42 changes: 28 additions & 14 deletions python/lance/util/versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
# limitations under the License.
from __future__ import annotations

import functools
import itertools
from datetime import datetime, timezone
from functools import cached_property
from pathlib import Path
from typing import Callable, Union

import duckdb
Expand Down Expand Up @@ -70,37 +70,51 @@ def get_version_asof(ds: FileSystemDataset, ts: [datetime, pd.Timestamp, str]) -


def compute_metric(
uri: [Path, str],
ds: FileSystemDataset,
metric_func: Callable[[FileSystemDataset], pd.DataFrame],
versions: list = None,
with_version: Union[bool, str] = True,
) -> pd.DataFrame:
"""
Compare metrics across versions of a dataset
"""
import lance
Parameters
----------
ds: FileSystemDataset
The base dataset we want to compute metrics across versions for.
metric_func: Callable[[FileSystemDataset], pd.DataFrame]
Function to compute metrics DataFrame from a given dataset version.
versions: list, default None
All versions if not specified.
with_version: bool or str, default True
If bool then controls whether to add the version as auxiliary output.
If str then assumed to be the name of the auxiliary output column.
"""
if versions is None:
versions = lance.dataset(uri).versions()
versions = ds.versions()
vcol_name = "version"
if isinstance(with_version, str):
vcol_name = with_version

results = []
for v in versions:
if isinstance(v, dict):
v = v["version"]
vdf = metric_func(lance.dataset(uri, version=v))
vcol_name = "version"
if isinstance(with_version, str):
vcol_name = with_version
vdf = metric_func(ds.checkout(v))
if vcol_name in vdf:
raise ValueError(f"{vcol_name} already in output df")
vdf[vcol_name] = v
results.append(vdf)
return pd.concat(results)


def diff(uri, v1: int, v2: int) -> LanceDiff:
def _compute_metric(version, uri, func, vcol_name):
import lance

return LanceDiff(lance.dataset(uri, version=v1), lance.dataset(uri, version=v2))
vdf = func(lance.dataset(uri, version=version))
if vcol_name in vdf:
raise ValueError(f"{vcol_name} already in output df")
vdf[vcol_name] = version
return vdf


class LanceDiff:
Expand All @@ -126,7 +140,7 @@ def columns_added(self) -> ColumnDiff:
"""
v2_fields = _flat_schema(self.v2.schema)
v1_fields = set([f.name for f in _flat_schema(self.v1.schema)])
new_fields = [f for f in v2_fields if f not in v1_fields]
new_fields = [f for f in v2_fields if f.name not in v1_fields]
return ColumnDiff(self.v2, new_fields)


Expand Down Expand Up @@ -179,7 +193,7 @@ def head(self, n: int = 10, columns: list[str] = None) -> pa.Table:
v1 = self.ds_start
v2 = self.ds_end
if columns is None:
columns = ["*"]
columns = ["v2.*"]
qry = self._query(columns, limit=n)
return duckdb.query(qry).to_arrow_table()

Expand Down

0 comments on commit 0efbd0a

Please sign in to comment.