From 268768f78bd7ea7b2df8ca0722faa72d4d4614c5 Mon Sep 17 00:00:00 2001 From: David Scharf Date: Mon, 16 Dec 2024 21:16:40 +0100 Subject: [PATCH] convert add_limit to pipe step based limiting (#2131) * convert add_limit to step based limiting * prevent late arriving items to be forwarded from limit add some convenience methods for pipe step management * added a few more tests for limit * add more limit functions from branch * remove rate-limiting * fix limiting bug and update docs * revert back to inserting validator step at the same position if replaced * make time limit tests more lenient for mac os tests * tmp * add test for testing incremental with limit * improve limit tests with parallelized case * add backfill example with sql_database * fix linting * remove extra file * only wrap iterators on demand * move items transform steps into extra file --- dlt/extract/exceptions.py | 1 - dlt/extract/hints.py | 3 +- dlt/extract/incremental/__init__.py | 3 +- dlt/extract/items.py | 119 +----------- dlt/extract/items_transform.py | 179 ++++++++++++++++++ dlt/extract/pipe.py | 20 +- dlt/extract/pipe_iterator.py | 8 +- dlt/extract/resource.py | 109 ++++------- dlt/extract/utils.py | 11 ++ dlt/extract/validation.py | 3 +- dlt/sources/helpers/transform.py | 2 +- docs/examples/backfill_in_chunks/__init__.py | 0 .../backfill_in_chunks/backfill_in_chunks.py | 85 +++++++++ docs/website/docs/general-usage/resource.md | 17 +- docs/website/docs/general-usage/source.md | 14 +- tests/extract/test_extract_pipe.py | 3 +- tests/extract/test_incremental.py | 58 +++++- tests/extract/test_sources.py | 78 +++++++- tests/extract/test_validation.py | 2 +- tests/extract/utils.py | 2 +- 20 files changed, 505 insertions(+), 212 deletions(-) create mode 100644 dlt/extract/items_transform.py create mode 100644 docs/examples/backfill_in_chunks/__init__.py create mode 100644 docs/examples/backfill_in_chunks/backfill_in_chunks.py diff --git a/dlt/extract/exceptions.py b/dlt/extract/exceptions.py index f4d2b1f302..e832833428 100644 --- a/dlt/extract/exceptions.py +++ b/dlt/extract/exceptions.py @@ -3,7 +3,6 @@ from dlt.common.exceptions import DltException from dlt.common.utils import get_callable_name -from dlt.extract.items import ValidateItem, TDataItems class ExtractorException(DltException): diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 000e5c4cdb..22a0062acf 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -37,7 +37,8 @@ InconsistentTableTemplate, ) from dlt.extract.incremental import Incremental, TIncrementalConfig -from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta, ValidateItem +from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta +from dlt.extract.items_transform import ValidateItem from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint from dlt.extract.validation import create_item_validator diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 5e7bae49c6..ce06292864 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -44,7 +44,8 @@ IncrementalArgs, TIncrementalRange, ) -from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform +from dlt.extract.items import SupportsPipe, TTableHintTemplate +from dlt.extract.items_transform import ItemTransform from dlt.extract.incremental.transform import ( JsonIncremental, ArrowIncremental, diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 888787e6b7..ad7447c163 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -1,21 +1,16 @@ -import inspect from abc import ABC, abstractmethod from typing import ( Any, Callable, - ClassVar, - Generic, Iterator, Iterable, Literal, Optional, Protocol, - TypeVar, Union, Awaitable, TYPE_CHECKING, NamedTuple, - Generator, ) from concurrent.futures import Future @@ -28,7 +23,6 @@ TDynHintType, ) - TDecompositionStrategy = Literal["none", "scc"] TDeferredDataItems = Callable[[], TDataItems] TAwaitableDataItems = Awaitable[TDataItems] @@ -113,6 +107,10 @@ def gen(self) -> TPipeStep: """A data generating step""" ... + def replace_gen(self, gen: TPipeStep) -> None: + """Replaces data generating step. Assumes that you know what are you doing""" + ... + def __getitem__(self, i: int) -> TPipeStep: """Get pipe step at index""" ... @@ -129,112 +127,3 @@ def has_parent(self) -> bool: def close(self) -> None: """Closes pipe generator""" ... - - -ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny] -ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny] -ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]] - - -class ItemTransform(ABC, Generic[TAny]): - _f_meta: ItemTransformFunctionWithMeta[TAny] = None - _f: ItemTransformFunctionNoMeta[TAny] = None - - placement_affinity: ClassVar[float] = 0 - """Tell how strongly an item sticks to start (-1) or end (+1) of pipe.""" - - def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None: - # inspect the signature - sig = inspect.signature(transform_f) - # TODO: use TypeGuard here to get rid of type ignore - if len(sig.parameters) == 1: - self._f = transform_f # type: ignore - else: # TODO: do better check - self._f_meta = transform_f # type: ignore - - def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]": - return self - - @abstractmethod - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - """Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)""" - pass - - -class FilterItem(ItemTransform[bool]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[bool] - _f: ItemTransformFunctionNoMeta[bool] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - # preserve empty lists - if len(item) == 0: - return item - - if self._f_meta: - item = [i for i in item if self._f_meta(i, meta)] - else: - item = [i for i in item if self._f(i)] - if not item: - # item was fully consumed by the filter - return None - return item - else: - if self._f_meta: - return item if self._f_meta(item, meta) else None - else: - return item if self._f(item) else None - - -class MapItem(ItemTransform[TDataItem]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[TDataItem] - _f: ItemTransformFunctionNoMeta[TDataItem] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - if self._f_meta: - return [self._f_meta(i, meta) for i in item] - else: - return [self._f(i) for i in item] - else: - if self._f_meta: - return self._f_meta(item, meta) - else: - return self._f(item) - - -class YieldMapItem(ItemTransform[Iterator[TDataItem]]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[TDataItem] - _f: ItemTransformFunctionNoMeta[TDataItem] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - for i in item: - if self._f_meta: - yield from self._f_meta(i, meta) - else: - yield from self._f(i) - else: - if self._f_meta: - yield from self._f_meta(item, meta) - else: - yield from self._f(item) - - -class ValidateItem(ItemTransform[TDataItem]): - """Base class for validators of data items. - - Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`. - See `PydanticValidator` for possible implementation. - """ - - placement_affinity: ClassVar[float] = 0.9 # stick to end but less than incremental - - table_name: str - - def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: - self.table_name = pipe.name - return self diff --git a/dlt/extract/items_transform.py b/dlt/extract/items_transform.py new file mode 100644 index 0000000000..12375640bc --- /dev/null +++ b/dlt/extract/items_transform.py @@ -0,0 +1,179 @@ +import inspect +import time + +from abc import ABC, abstractmethod +from typing import ( + Any, + Callable, + ClassVar, + Generic, + Iterator, + Optional, + Union, +) +from concurrent.futures import Future + +from dlt.common.typing import ( + TAny, + TDataItem, + TDataItems, +) + +from dlt.extract.utils import ( + wrap_iterator, +) + +from dlt.extract.items import SupportsPipe + + +ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny] +ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny] +ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]] + + +class ItemTransform(ABC, Generic[TAny]): + _f_meta: ItemTransformFunctionWithMeta[TAny] = None + _f: ItemTransformFunctionNoMeta[TAny] = None + + placement_affinity: ClassVar[float] = 0 + """Tell how strongly an item sticks to start (-1) or end (+1) of pipe.""" + + def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None: + # inspect the signature + sig = inspect.signature(transform_f) + # TODO: use TypeGuard here to get rid of type ignore + if len(sig.parameters) == 1: + self._f = transform_f # type: ignore + else: # TODO: do better check + self._f_meta = transform_f # type: ignore + + def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]": + return self + + @abstractmethod + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + """Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)""" + pass + + +class FilterItem(ItemTransform[bool]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[bool] + _f: ItemTransformFunctionNoMeta[bool] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + # preserve empty lists + if len(item) == 0: + return item + + if self._f_meta: + item = [i for i in item if self._f_meta(i, meta)] + else: + item = [i for i in item if self._f(i)] + if not item: + # item was fully consumed by the filter + return None + return item + else: + if self._f_meta: + return item if self._f_meta(item, meta) else None + else: + return item if self._f(item) else None + + +class MapItem(ItemTransform[TDataItem]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[TDataItem] + _f: ItemTransformFunctionNoMeta[TDataItem] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + if self._f_meta: + return [self._f_meta(i, meta) for i in item] + else: + return [self._f(i) for i in item] + else: + if self._f_meta: + return self._f_meta(item, meta) + else: + return self._f(item) + + +class YieldMapItem(ItemTransform[Iterator[TDataItem]]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[TDataItem] + _f: ItemTransformFunctionNoMeta[TDataItem] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + for i in item: + if self._f_meta: + yield from self._f_meta(i, meta) + else: + yield from self._f(i) + else: + if self._f_meta: + yield from self._f_meta(item, meta) + else: + yield from self._f(item) + + +class ValidateItem(ItemTransform[TDataItem]): + """Base class for validators of data items. + + Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`. + See `PydanticValidator` for possible implementation. + """ + + placement_affinity: ClassVar[float] = 0.9 # stick to end but less than incremental + + table_name: str + + def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: + self.table_name = pipe.name + return self + + +class LimitItem(ItemTransform[TDataItem]): + placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental + + def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None: + self.max_items = max_items if max_items is not None else -1 + self.max_time = max_time + + def bind(self, pipe: SupportsPipe) -> "LimitItem": + # we also wrap iterators to make them stoppable + if isinstance(pipe.gen, Iterator): + pipe.replace_gen(wrap_iterator(pipe.gen)) + + self.gen = pipe.gen + self.count = 0 + self.exhausted = False + self.start_time = time.time() + + return self + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + self.count += 1 + + # detect when the limit is reached, max time or yield count + if ( + (self.count == self.max_items) + or (self.max_time and time.time() - self.start_time > self.max_time) + or self.max_items == 0 + ): + self.exhausted = True + if inspect.isgenerator(self.gen): + self.gen.close() + + # if max items is not 0, we return the last item + # otherwise never return anything + if self.max_items != 0: + return item + + # do not return any late arriving items + if self.exhausted: + return None + + return item diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 02b52c4623..e70365b4f4 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -27,12 +27,12 @@ UnclosablePipe, ) from dlt.extract.items import ( - ItemTransform, ResolvablePipeItem, SupportsPipe, TPipeStep, TPipedDataItems, ) +from dlt.extract.items_transform import ItemTransform from dlt.extract.utils import ( check_compat_transformer, simulate_func_call, @@ -122,7 +122,23 @@ def steps(self) -> List[TPipeStep]: def find(self, *step_type: AnyType) -> int: """Finds a step with object of type `step_type`""" - return next((i for i, v in enumerate(self._steps) if isinstance(v, step_type)), -1) + found = self.find_all(step_type) + return found[0] if found else -1 + + def find_all(self, *step_type: AnyType) -> List[int]: + """Finds all steps with object of type `step_type`""" + return [i for i, v in enumerate(self._steps) if isinstance(v, step_type)] + + def get_by_type(self, *step_type: AnyType) -> TPipeStep: + """Gets first step found with object of type `step_type`""" + return next((v for v in self._steps if isinstance(v, step_type)), None) + + def remove_by_type(self, *step_type: AnyType) -> int: + """Deletes first step found with object of type `step_type`, returns previous index""" + step_index = self.find(*step_type) + if step_index >= 0: + self.remove_step(step_index) + return step_index def __getitem__(self, i: int) -> TPipeStep: return self._steps[i] diff --git a/dlt/extract/pipe_iterator.py b/dlt/extract/pipe_iterator.py index 465040f9f4..38641c0626 100644 --- a/dlt/extract/pipe_iterator.py +++ b/dlt/extract/pipe_iterator.py @@ -24,7 +24,11 @@ ) from dlt.common.configuration.container import Container from dlt.common.exceptions import PipelineException -from dlt.common.pipeline import unset_current_pipe_name, set_current_pipe_name +from dlt.common.pipeline import ( + unset_current_pipe_name, + set_current_pipe_name, + get_current_pipe_name, +) from dlt.common.utils import get_callable_name from dlt.extract.exceptions import ( @@ -180,7 +184,6 @@ def __next__(self) -> PipeItem: item = pipe_item.item # if item is iterator, then add it as a new source if isinstance(item, Iterator): - # print(f"adding iterable {item}") self._sources.append( SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta) ) @@ -291,7 +294,6 @@ def _get_source_item(self) -> ResolvablePipeItem: first_evaluated_index = self._current_source_index # always go round robin if None was returned or item is to be run as future self._current_source_index = (self._current_source_index - 1) % sources_count - except StopIteration: # remove empty iterator and try another source self._sources.pop(self._current_source_index) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 42e3905162..366e6e1a88 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -2,7 +2,7 @@ from functools import partial from typing import ( AsyncIterable, - AsyncIterator, + cast, ClassVar, Callable, Iterable, @@ -34,13 +34,16 @@ from dlt.extract.items import ( DataItemWithMeta, - ItemTransformFunc, - ItemTransformFunctionWithMeta, TableNameMeta, +) +from dlt.extract.items_transform import ( FilterItem, MapItem, YieldMapItem, ValidateItem, + LimitItem, + ItemTransformFunc, + ItemTransformFunctionWithMeta, ) from dlt.extract.pipe_iterator import ManagedPipeIterator from dlt.extract.pipe import Pipe, TPipeStep @@ -214,29 +217,22 @@ def requires_args(self) -> bool: return True @property - def incremental(self) -> IncrementalResourceWrapper: + def incremental(self) -> Optional[IncrementalResourceWrapper]: """Gets incremental transform if it is in the pipe""" - incremental: IncrementalResourceWrapper = None - step_no = self._pipe.find(IncrementalResourceWrapper, Incremental) - if step_no >= 0: - incremental = self._pipe.steps[step_no] # type: ignore - return incremental + return cast( + Optional[IncrementalResourceWrapper], + self._pipe.get_by_type(IncrementalResourceWrapper, Incremental), + ) @property def validator(self) -> Optional[ValidateItem]: """Gets validator transform if it is in the pipe""" - validator: ValidateItem = None - step_no = self._pipe.find(ValidateItem) - if step_no >= 0: - validator = self._pipe.steps[step_no] # type: ignore[assignment] - return validator + return cast(Optional[ValidateItem], self._pipe.get_by_type(ValidateItem)) @validator.setter def validator(self, validator: Optional[ValidateItem]) -> None: """Add/remove or replace the validator in pipe""" - step_no = self._pipe.find(ValidateItem) - if step_no >= 0: - self._pipe.remove_step(step_no) + step_no = self._pipe.remove_by_type(ValidateItem) if validator: self.add_step(validator, insert_at=step_no if step_no >= 0 else None) @@ -347,72 +343,37 @@ def add_filter( self._pipe.insert_step(FilterItem(item_filter), insert_at) return self - def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # noqa: A003 + def add_limit( + self: TDltResourceImpl, + max_items: Optional[int] = None, + max_time: Optional[float] = None, + ) -> TDltResourceImpl: # noqa: A003 """Adds a limit `max_items` to the resource pipe. - This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging. + This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging. - Notes: - 1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets. - 2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records. - 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. + Notes: + 1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets. + 2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records. + 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. Args: - max_items (int): The maximum number of items to yield - Returns: - "DltResource": returns self + max_items (int): The maximum number of items to yield, set to None for no limit + max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit + Returns: + "DltResource": returns self """ - # make sure max_items is a number, to allow "None" as value for unlimited - if max_items is None: - max_items = -1 - - def _gen_wrap(gen: TPipeStep) -> TPipeStep: - """Wrap a generator to take the first `max_items` records""" - - # zero items should produce empty generator - if max_items == 0: - return - - count = 0 - is_async_gen = False - if callable(gen): - gen = gen() # type: ignore - - # wrap async gen already here - if isinstance(gen, AsyncIterator): - gen = wrap_async_iterator(gen) - is_async_gen = True - - try: - for i in gen: # type: ignore # TODO: help me fix this later - yield i - if i is not None: - count += 1 - # async gen yields awaitable so we must count one awaitable more - # so the previous one is evaluated and yielded. - # new awaitable will be cancelled - if count == max_items + int(is_async_gen): - return - finally: - if inspect.isgenerator(gen): - gen.close() - return - - # transformers should be limited by their input, so we only limit non-transformers - if not self.is_transformer: - gen = self._pipe.gen - # wrap gen directly - if inspect.isgenerator(gen): - self._pipe.replace_gen(_gen_wrap(gen)) - else: - # keep function as function to not evaluate generators before pipe starts - self._pipe.replace_gen(partial(_gen_wrap, gen)) - else: + if self.is_transformer: logger.warning( f"Setting add_limit to a transformer {self.name} has no effect. Set the limit on" " the top level resource." ) + else: + # remove existing limit if any + self._pipe.remove_by_type(LimitItem) + self.add_step(LimitItem(max_items=max_items, max_time=max_time)) + return self def parallelize(self: TDltResourceImpl) -> TDltResourceImpl: @@ -445,9 +406,7 @@ def add_step( return self def _remove_incremental_step(self) -> None: - step_no = self._pipe.find(Incremental, IncrementalResourceWrapper) - if step_no >= 0: - self._pipe.remove_step(step_no) + self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper) def set_incremental( self, diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 68570d0995..0bcd13155e 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -183,6 +183,17 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in return meta_arg +def wrap_iterator(gen: Iterator[TDataItems]) -> Iterator[TDataItems]: + """Wraps an iterator into a generator""" + if inspect.isgenerator(gen): + return gen + + def wrapped_gen() -> Iterator[TDataItems]: + yield from gen + + return wrapped_gen() + + def wrap_async_iterator( gen: AsyncIterator[TDataItems], ) -> Generator[Awaitable[TDataItems], None, None]: diff --git a/dlt/extract/validation.py b/dlt/extract/validation.py index 4cd321b88c..d9fe70a90b 100644 --- a/dlt/extract/validation.py +++ b/dlt/extract/validation.py @@ -8,7 +8,8 @@ from dlt.common.typing import TDataItems from dlt.common.schema.typing import TAnySchemaColumns, TSchemaContract, TSchemaEvolutionMode -from dlt.extract.items import TTableHintTemplate, ValidateItem +from dlt.extract.items import TTableHintTemplate +from dlt.extract.items_transform import ValidateItem _TPydanticModel = TypeVar("_TPydanticModel", bound=PydanticBaseModel) diff --git a/dlt/sources/helpers/transform.py b/dlt/sources/helpers/transform.py index 32843e2aa2..45738fe4fb 100644 --- a/dlt/sources/helpers/transform.py +++ b/dlt/sources/helpers/transform.py @@ -2,7 +2,7 @@ from typing import Any, Dict, Sequence, Union from dlt.common.typing import TDataItem -from dlt.extract.items import ItemTransformFunctionNoMeta +from dlt.extract.items_transform import ItemTransformFunctionNoMeta import jsonpath_ng diff --git a/docs/examples/backfill_in_chunks/__init__.py b/docs/examples/backfill_in_chunks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/examples/backfill_in_chunks/backfill_in_chunks.py b/docs/examples/backfill_in_chunks/backfill_in_chunks.py new file mode 100644 index 0000000000..a758d67f7b --- /dev/null +++ b/docs/examples/backfill_in_chunks/backfill_in_chunks.py @@ -0,0 +1,85 @@ +""" +--- +title: Backfilling in chunks +description: Learn how to backfill in chunks of defined size +keywords: [incremental loading, backfilling, chunks,example] +--- + +In this example, you'll find a Python script that will load from a sql_database source in chunks of defined size. This is useful for backfilling in multiple pipeline runs as +opposed to backfilling in one very large pipeline run which may fail due to memory issues on ephemeral storage or just take a very long time to complete without seeing any +progress in the destination. + +We'll learn how to: + +- Connect to a mysql database with the sql_database source +- Select one table to load and apply incremental loading hints as well as the primary key +- Set the chunk size and limit the number of chunks to load in one pipeline run +- Create a pipeline and backfill the table in the defined chunks +- Use the datasets accessor to inspect and assert the load progress + +""" + +import pandas as pd + +import dlt +from dlt.sources.sql_database import sql_database + + +if __name__ == "__main__": + # NOTE: this is a live table in the rfam database, so the number of final rows may change + TOTAL_TABLE_ROWS = 4178 + RFAM_CONNECTION_STRING = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" + + # create sql database source that only loads the family table in chunks of 1000 rows + source = sql_database(RFAM_CONNECTION_STRING, table_names=["family"], chunk_size=1000) + + # we apply some hints to the table, we know the rfam_id is unique and that we can order + # and load incrementally on the created datetime column + source.family.apply_hints( + primary_key="rfam_id", + incremental=dlt.sources.incremental( + cursor_path="created", initial_value=None, row_order="asc" + ), + ) + + # with limit we can limit the number of chunks to load, with a chunk size of 1000 and a limit of 1 + # we will load 1000 rows per pipeline run + source.add_limit(1) + + # create pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", destination="duckdb", dataset_name="rfam_data", dev_mode=True + ) + + def _assert_unique_row_count(df: pd.DataFrame, num_rows: int) -> None: + """Assert that a dataframe has the correct number of unique rows""" + # NOTE: this check is dependent on reading the full table back from the destination into memory, + # so it is only useful for testing before you do a large backfill. + assert len(df) == num_rows + assert len(set(df.rfam_id.tolist())) == num_rows + + # after the first run, the family table in the destination should contain the first 1000 rows + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), 1000) + + # after the second run, the family table in the destination should contain 1999 rows + # there is some overlap on the incremental to prevent skipping rows + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), 1999) + + # ... + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), 2998) + + # ... + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), 3997) + + # the final run will load all the rows until the end of the table + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), TOTAL_TABLE_ROWS) + + # NOTE: in a production environment you will likely: + # * be using much larger chunk sizes and limits + # * run the pipeline in a loop to load all the rows + # * and programmatically check if the table is fully loaded and abort the loop if this is the case. diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 199eaf9b5d..b8d51caf75 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -405,11 +405,26 @@ dlt.pipeline(destination="duckdb").run(my_resource().add_limit(10)) The code above will extract `15*10=150` records. This is happening because in each iteration, 15 records are yielded, and we're limiting the number of iterations to 10. ::: -Some constraints of `add_limit` include: +Altenatively you can also apply a time limit to the resource. The code below will run the extraction for 10 seconds and extract how ever many items are yielded in that time. In combination with incrementals, this can be useful for batched loading or for loading on machines that have a run time limit. + +```py +dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_time=10)) +``` + +You can also apply a combination of both limits. In this case the extraction will stop as soon as either limit is reached. + +```py +dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_items=10, max_time=10)) +``` + + +Some notes about the `add_limit`: 1. `add_limit` does not skip any items. It closes the iterator/generator that produces data after the limit is reached. 2. You cannot limit transformers. They should process all the data they receive fully to avoid inconsistencies in generated datasets. 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. +4. Calling add limit on a resource will replace any previously set limits settings. +5. For time-limited resources, the timer starts when the first item is processed. When resources are processed sequentially (FIFO mode), each resource's time limit applies also sequentially. In the default round robin mode, the time limits will usually run concurrently. :::tip If you are parameterizing the value of `add_limit` and sometimes need it to be disabled, you can set `None` or `-1` to disable the limiting. diff --git a/docs/website/docs/general-usage/source.md b/docs/website/docs/general-usage/source.md index 87c07a3e44..9c6c2aac13 100644 --- a/docs/website/docs/general-usage/source.md +++ b/docs/website/docs/general-usage/source.md @@ -107,8 +107,20 @@ load_info = pipeline.run(pipedrive_source().add_limit(10)) print(load_info) ``` +You can also apply a time limit to the source: + +```py +pipeline.run(pipedrive_source().add_limit(max_time=10)) +``` + +Or limit by both, the limit that is reached first will stop the extraction: + +```py +pipeline.run(pipedrive_source().add_limit(max_items=10, max_time=10)) +``` + :::note -Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached. +Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached. Please read in more detail about the `add_limit` on the resource page. ::: Find more on sampling data [here](resource.md#sample-from-large-data). diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index d40639a594..659888269a 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -10,7 +10,8 @@ from dlt.common import sleep from dlt.common.typing import TDataItems from dlt.extract.exceptions import CreatePipeException, ResourceExtractionError, UnclosablePipe -from dlt.extract.items import DataItemWithMeta, FilterItem, MapItem, YieldMapItem +from dlt.extract.items import DataItemWithMeta +from dlt.extract.items_transform import FilterItem, MapItem, YieldMapItem from dlt.extract.pipe import Pipe from dlt.extract.pipe_iterator import PipeIterator, ManagedPipeIterator, PipeItem diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index d63dac93f2..9ad7d28e88 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -7,6 +7,7 @@ from time import sleep from typing import Any, Optional, Literal, Sequence, Dict, Iterable from unittest import mock +import itertools import duckdb import pyarrow as pa @@ -35,7 +36,7 @@ IncrementalPrimaryKeyMissing, ) from dlt.extract.incremental.lag import apply_lag -from dlt.extract.items import ValidateItem +from dlt.extract.items_transform import ValidateItem from dlt.extract.resource import DltResource from dlt.pipeline.exceptions import PipelineStepFailed from dlt.sources.helpers.transform import take_first @@ -3960,3 +3961,58 @@ def some_data( # Includes values 5-10 inclusive assert items == expected_items + + +@pytest.mark.parametrize("offset_by_last_value", [True, False]) +def test_incremental_and_limit(offset_by_last_value: bool): + resource_called = 0 + + # here we check incremental and limit when incremental once when last value cannot be used + # to offset the source, and once when it can. + + @dlt.resource( + table_name="items", + ) + def resource( + incremental=dlt.sources.incremental(cursor_path="id", initial_value=-1, row_order="asc") + ): + range_iterator = ( + range(incremental.start_value + 1, 1000) if offset_by_last_value else range(1000) + ) + for i in range_iterator: + nonlocal resource_called + resource_called += 1 + yield { + "id": i, + "value": str(i), + } + + resource.add_limit(10) + + p = dlt.pipeline(pipeline_name="incremental_limit", destination="duckdb", dev_mode=True) + + p.run(resource()) + + # check we have the right number of items + assert len(p.dataset().items.df()) == 10 + assert resource_called == 10 + # check that we have items 0-9 + assert p.dataset().items.df().id.tolist() == list(range(10)) + + # run the next ten + p.run(resource()) + + # check we have the right number of items + assert len(p.dataset().items.df()) == 20 + assert resource_called == 20 if offset_by_last_value else 30 + # check that we have items 0-19 + assert p.dataset().items.df().id.tolist() == list(range(20)) + + # run the next batch + p.run(resource()) + + # check we have the right number of items + assert len(p.dataset().items.df()) == 30 + assert resource_called == 30 if offset_by_last_value else 60 + # check that we have items 0-29 + assert p.dataset().items.df().id.tolist() == list(range(30)) diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 3d021d5d10..86646e6369 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -1,4 +1,6 @@ import itertools +import time + from typing import Iterator import pytest @@ -837,7 +839,7 @@ def test_limit_infinite_counter() -> None: @pytest.mark.parametrize("limit", (None, -1, 0, 10)) def test_limit_edge_cases(limit: int) -> None: - r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore + r = dlt.resource(range(20), name="resource").add_limit(limit) # type: ignore @dlt.resource() async def r_async(): @@ -845,22 +847,62 @@ async def r_async(): await asyncio.sleep(0.01) yield i + @dlt.resource(parallelized=True) + def parallelized_resource(): + for i in range(20): + yield i + sync_list = list(r) async_list = list(r_async().add_limit(limit)) + parallelized_list = list(parallelized_resource().add_limit(limit)) + + # all lists should be the same + assert sync_list == async_list == parallelized_list if limit == 10: assert sync_list == list(range(10)) - # we have edge cases where the async list will have one extra item - # possibly due to timing issues, maybe some other implementation problem - assert (async_list == list(range(10))) or (async_list == list(range(11))) elif limit in [None, -1]: - assert sync_list == async_list == list(range(20)) + assert sync_list == list(range(20)) elif limit == 0: - assert sync_list == async_list == [] + assert sync_list == [] else: raise AssertionError(f"Unexpected limit: {limit}") +def test_various_limit_setups() -> None: + # basic test + r = dlt.resource([1, 2, 3, 4, 5], name="test").add_limit(3) + assert list(r) == [1, 2, 3] + + # yield map test + r = ( + dlt.resource([1, 2, 3, 4, 5], name="test") + .add_map(lambda i: str(i) * i, 1) + .add_yield_map(lambda i: (yield from i)) + .add_limit(3) + ) + # limit is applied at the end + assert list(r) == ["1", "2", "2"] # "3" ,"3" ,"3" ,"4" ,"4" ,"4" ,"4", ...] + + # nested lists test (limit only applied to yields, not actual items) + r = dlt.resource([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]], name="test").add_limit(3) + assert list(r) == [1, 2, 3, 4, 5, 6, 7, 8, 9] + + # transformer test + r = dlt.resource([1, 2, 3, 4, 5], name="test").add_limit(4) + t = dlt.transformer(lambda i: i * 2, name="test") + assert list(r) == [1, 2, 3, 4] + assert list(r | t) == [2, 4, 6, 8] + + # adding limit to transformer is disregarded + t = t.add_limit(2) + assert list(r | t) == [2, 4, 6, 8] + + # limits are fully replaced (more genereous limit applied later takes precedence) + r = dlt.resource([1, 2, 3, 4, 5], name="test").add_limit(3).add_limit(4) + assert list(r) == [1, 2, 3, 4] + + def test_limit_source() -> None: def mul_c(item): yield from "A" * (item + 2) @@ -876,6 +918,30 @@ def infinite_source(): assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3 +def test_limit_max_time() -> None: + @dlt.resource() + def r(): + for i in range(100): + time.sleep(0.1) + yield i + + @dlt.resource() + async def r_async(): + for i in range(100): + await asyncio.sleep(0.1) + yield i + + sync_list = list(r().add_limit(max_time=1)) + async_list = list(r_async().add_limit(max_time=1)) + + # we should have extracted 10 items within 1 second, sleep is included in the resource + # we allow for some variance in the number of items, as the sleep is not super precise + # on mac os we even sometimes just get 4 items... + allowed_results = [list(range(i)) for i in [12, 11, 10, 9, 8, 7, 6, 5, 4]] + assert sync_list in allowed_results + assert async_list in allowed_results + + def test_source_state() -> None: @dlt.source def test_source(expected_state): diff --git a/tests/extract/test_validation.py b/tests/extract/test_validation.py index 138589bb06..3800f333f6 100644 --- a/tests/extract/test_validation.py +++ b/tests/extract/test_validation.py @@ -10,7 +10,7 @@ from dlt.common.libs.pydantic import BaseModel from dlt.extract import DltResource -from dlt.extract.items import ValidateItem +from dlt.extract.items_transform import ValidateItem from dlt.extract.validation import PydanticValidator from dlt.extract.exceptions import ResourceExtractionError from dlt.pipeline.exceptions import PipelineStepFailed diff --git a/tests/extract/utils.py b/tests/extract/utils.py index 7364ef7243..f1de3de093 100644 --- a/tests/extract/utils.py +++ b/tests/extract/utils.py @@ -6,7 +6,7 @@ from dlt.common.typing import TDataItem, TDataItems from dlt.extract.extract import ExtractStorage -from dlt.extract.items import ItemTransform +from dlt.extract.items_transform import ItemTransform from tests.utils import TestDataItemFormat