From f5593c64ef53b20c436ac95c2b64267e53713986 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Thu, 4 May 2023 12:44:12 +0100 Subject: [PATCH 1/9] refactor default batch file writer --- singer_sdk/batch.py | 93 ++++++++++++++++++++++++++++++++++++ singer_sdk/helpers/_batch.py | 8 ++++ singer_sdk/streams/core.py | 57 ++++------------------ 3 files changed, 110 insertions(+), 48 deletions(-) create mode 100644 singer_sdk/batch.py diff --git a/singer_sdk/batch.py b/singer_sdk/batch.py new file mode 100644 index 000000000..3edf509a4 --- /dev/null +++ b/singer_sdk/batch.py @@ -0,0 +1,93 @@ +import gzip +import itertools +import json +import typing as t +from abc import ABC +from uuid import uuid4 + +from singer_sdk.helpers._batch import BatchConfig + +_T = t.TypeVar("_T") + + +def lazy_chunked_generator( + iterable: t.Iterable[_T], + chunk_size: int, +) -> t.Generator[t.Iterator[_T], None, None]: + """Yield a generator for each chunk of the given iterable. + + Args: + iterable: The iterable to chunk. + chunk_size: The size of each chunk. + + Yields: + A generator for each chunk of the given iterable. + """ + iterator = iter(iterable) + while True: + chunk = list(itertools.islice(iterator, chunk_size)) + if not chunk: + break + yield iter(chunk) + + +class BaseBatcher(ABC): + """Base Record Batcher.""" + + def __init__( + self, tap_name: str, stream_name: str, batch_config: BatchConfig + ) -> None: + self.tap_name = tap_name + self.stream_name = stream_name + self.batch_config = batch_config + + def get_batches( + self, records: t.Generator[dict, t.Any, t.Any] + ) -> t.Generator[list[str]]: + """Yield manifest of batches. + + Args: + records: The records to batch. + + Raises: + NotImplementedError: If the method is not implemented. + """ + raise NotImplementedError() + + +class JSONLinesBatcher(BaseBatcher): + """JSON Lines Record Batcher.""" + + def get_batches( + self, records: t.Generator[dict, t.Any, t.Any] + ) -> t.Generator[list[str], t.Any, t.Any]: + """Yield manifest of batches. + + Args: + records: The records to batch. + + Yields: + A list of file paths (called a manifest). + """ + sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}" + prefix = self.batch_config.storage.prefix or "" + + for i, chunk in enumerate( + lazy_chunked_generator( + records, + self.batch_config.batch_size, + ), + start=1, + ): + filename = f"{prefix}{sync_id}-{i}.json.gz" + with self.batch_config.storage.fs() as fs: + # TODO: Determine compression from config. + with fs.open(filename, "wb") as f, gzip.GzipFile( + fileobj=f, + mode="wb", + ) as gz: + gz.writelines( + (json.dumps(record) + "\n").encode() for record in chunk + ) + file_url = fs.geturl(filename) + yield [file_url] diff --git a/singer_sdk/helpers/_batch.py b/singer_sdk/helpers/_batch.py index 41ce7de43..2c7eb51ac 100644 --- a/singer_sdk/helpers/_batch.py +++ b/singer_sdk/helpers/_batch.py @@ -16,6 +16,8 @@ if t.TYPE_CHECKING: from fs.base import FS +DEFAULT_BATCH_SIZE = 10000 + class BatchFileFormat(str, enum.Enum): """Batch file format.""" @@ -209,6 +211,9 @@ class BatchConfig: storage: StorageTarget """The storage target of the batch file.""" + batch_size: int + """The max number of records in a batch.""" + def __post_init__(self): if isinstance(self.encoding, dict): self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) @@ -216,6 +221,9 @@ def __post_init__(self): if isinstance(self.storage, dict): self.storage = StorageTarget.from_dict(self.storage) + if self.batch_size is None: + self.batch_size = DEFAULT_BATCH_SIZE + def asdict(self): """Return a dictionary representation of the message. diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 3b7e4120f..e89e422f7 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -5,19 +5,17 @@ import abc import copy import datetime -import gzip -import itertools import json import typing as t from os import PathLike from pathlib import Path from types import MappingProxyType -from uuid import uuid4 import pendulum import singer_sdk._singerlib as singer from singer_sdk import metrics +from singer_sdk.batch import JSONLinesBatcher from singer_sdk.exceptions import ( AbortedSyncFailedException, AbortedSyncPausedException, @@ -63,28 +61,6 @@ REPLICATION_LOG_BASED = "LOG_BASED" FactoryType = t.TypeVar("FactoryType", bound="Stream") -_T = t.TypeVar("_T") - - -def lazy_chunked_generator( - iterable: t.Iterable[_T], - chunk_size: int, -) -> t.Generator[t.Iterator[_T], None, None]: - """Yield a generator for each chunk of the given iterable. - - Args: - iterable: The iterable to chunk. - chunk_size: The size of each chunk. - - Yields: - A generator for each chunk of the given iterable. - """ - iterator = iter(iterable) - while True: - chunk = list(itertools.islice(iterator, chunk_size)) - if not chunk: - break - yield iter(chunk) class Stream(metaclass=abc.ABCMeta): @@ -1341,29 +1317,14 @@ def get_batches( Yields: A tuple of (encoding, manifest) for each batch. """ - sync_id = f"{self.tap_name}--{self.name}-{uuid4()}" - prefix = batch_config.storage.prefix or "" - - for i, chunk in enumerate( - lazy_chunked_generator( - self._sync_records(context, write_messages=False), - self.batch_size, - ), - start=1, - ): - filename = f"{prefix}{sync_id}-{i}.json.gz" - with batch_config.storage.fs() as fs: - # TODO: Determine compression from config. - with fs.open(filename, "wb") as f, gzip.GzipFile( - fileobj=f, - mode="wb", - ) as gz: - gz.writelines( - (json.dumps(record) + "\n").encode() for record in chunk - ) - file_url = fs.geturl(filename) - - yield batch_config.encoding, [file_url] + batcher = JSONLinesBatcher( + tap_name=self.tap_name, + stream_name=self.name, + batch_config=batch_config, + ) + records = self._sync_records(context, write_messages=False) + for manifest in batcher.get_batches(records=records): + yield batch_config.encoding, manifest def post_process( self, From c91e0a4d87d3214eda9621715bc14867c34268d4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 4 May 2023 11:45:38 +0000 Subject: [PATCH 2/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- singer_sdk/batch.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/singer_sdk/batch.py b/singer_sdk/batch.py index 3edf509a4..d8bd86de9 100644 --- a/singer_sdk/batch.py +++ b/singer_sdk/batch.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import gzip import itertools import json @@ -35,14 +37,18 @@ class BaseBatcher(ABC): """Base Record Batcher.""" def __init__( - self, tap_name: str, stream_name: str, batch_config: BatchConfig + self, + tap_name: str, + stream_name: str, + batch_config: BatchConfig, ) -> None: self.tap_name = tap_name self.stream_name = stream_name self.batch_config = batch_config def get_batches( - self, records: t.Generator[dict, t.Any, t.Any] + self, + records: t.Generator[dict, t.Any, t.Any], ) -> t.Generator[list[str]]: """Yield manifest of batches. @@ -52,14 +58,15 @@ def get_batches( Raises: NotImplementedError: If the method is not implemented. """ - raise NotImplementedError() + raise NotImplementedError class JSONLinesBatcher(BaseBatcher): """JSON Lines Record Batcher.""" def get_batches( - self, records: t.Generator[dict, t.Any, t.Any] + self, + records: t.Generator[dict, t.Any, t.Any], ) -> t.Generator[list[str], t.Any, t.Any]: """Yield manifest of batches. From 1eb584be21350804110347e7a88abd82bd8343d4 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Thu, 4 May 2023 12:50:01 +0100 Subject: [PATCH 3/9] precommit changes --- singer_sdk/batch.py | 9 +++++++-- singer_sdk/streams/core.py | 4 ---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/singer_sdk/batch.py b/singer_sdk/batch.py index 3edf509a4..7e802c031 100644 --- a/singer_sdk/batch.py +++ b/singer_sdk/batch.py @@ -1,11 +1,14 @@ +"""Batching utilities for Singer SDK.""" + import gzip import itertools import json import typing as t -from abc import ABC +from abc import ABC, abstractmethod from uuid import uuid4 -from singer_sdk.helpers._batch import BatchConfig +if t.TYPE_CHECKING: + from singer_sdk.helpers._batch import BatchConfig _T = t.TypeVar("_T") @@ -37,10 +40,12 @@ class BaseBatcher(ABC): def __init__( self, tap_name: str, stream_name: str, batch_config: BatchConfig ) -> None: + """Initialize the batcher.""" self.tap_name = tap_name self.stream_name = stream_name self.batch_config = batch_config + @abstractmethod def get_batches( self, records: t.Generator[dict, t.Any, t.Any] ) -> t.Generator[list[str]]: diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index e89e422f7..e6cfeefc2 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -100,10 +100,6 @@ class Stream(metaclass=abc.ABCMeta): # Internal API cost aggregator _sync_costs: dict[str, int] = {} - # Batch attributes - batch_size: int = 1000 - """Max number of records to write to each batch file.""" - def __init__( self, tap: Tap, From 7a7131d9aaf8318a2acbbf4e1f3d10b437978b7a Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Thu, 4 May 2023 12:52:08 +0100 Subject: [PATCH 4/9] more precommit changes --- singer_sdk/batch.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/singer_sdk/batch.py b/singer_sdk/batch.py index 03307e019..73ea42e4a 100644 --- a/singer_sdk/batch.py +++ b/singer_sdk/batch.py @@ -44,7 +44,13 @@ def __init__( stream_name: str, batch_config: BatchConfig, ) -> None: - """Initialize the batcher.""" + """Initialize the batcher. + + Args: + tap_name: The name of the tap. + stream_name: The name of the stream. + batch_config: The batch configuration. + """ self.tap_name = tap_name self.stream_name = stream_name self.batch_config = batch_config From c2b16e7f2fc74414911109aca6a0c33c13d41c45 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Thu, 4 May 2023 12:55:38 +0100 Subject: [PATCH 5/9] add default batch size --- singer_sdk/helpers/_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/helpers/_batch.py b/singer_sdk/helpers/_batch.py index 2c7eb51ac..f8b974634 100644 --- a/singer_sdk/helpers/_batch.py +++ b/singer_sdk/helpers/_batch.py @@ -211,7 +211,7 @@ class BatchConfig: storage: StorageTarget """The storage target of the batch file.""" - batch_size: int + batch_size: int | None = DEFAULT_BATCH_SIZE """The max number of records in a batch.""" def __post_init__(self): From 0286adc8cb9f740da70d7f5e6b075ab73b76b716 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Thu, 4 May 2023 13:11:58 +0100 Subject: [PATCH 6/9] mypy --- singer_sdk/batch.py | 4 ++-- singer_sdk/helpers/_batch.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/singer_sdk/batch.py b/singer_sdk/batch.py index 73ea42e4a..abafaa784 100644 --- a/singer_sdk/batch.py +++ b/singer_sdk/batch.py @@ -76,8 +76,8 @@ class JSONLinesBatcher(BaseBatcher): def get_batches( self, - records: t.Generator[dict, t.Any, t.Any], - ) -> t.Generator[list[str], t.Any, t.Any]: + records: t.Iterator[dict], + ) -> t.Iterator[list[str]]: """Yield manifest of batches. Args: diff --git a/singer_sdk/helpers/_batch.py b/singer_sdk/helpers/_batch.py index f8b974634..62447ddb3 100644 --- a/singer_sdk/helpers/_batch.py +++ b/singer_sdk/helpers/_batch.py @@ -211,7 +211,7 @@ class BatchConfig: storage: StorageTarget """The storage target of the batch file.""" - batch_size: int | None = DEFAULT_BATCH_SIZE + batch_size: int = DEFAULT_BATCH_SIZE """The max number of records in a batch.""" def __post_init__(self): From dfe34394a0b3d7ea36dd6e2087a4a597ec6f0acf Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Thu, 4 May 2023 13:15:37 +0100 Subject: [PATCH 7/9] more mypy --- singer_sdk/batch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/singer_sdk/batch.py b/singer_sdk/batch.py index abafaa784..c321ebdd1 100644 --- a/singer_sdk/batch.py +++ b/singer_sdk/batch.py @@ -58,8 +58,8 @@ def __init__( @abstractmethod def get_batches( self, - records: t.Generator[dict, t.Any, t.Any], - ) -> t.Generator[list[str]]: + records: t.Iterator[dict], + ) -> t.Iterator[list[str]]: """Yield manifest of batches. Args: From 7434f1bab2fde70d3dcbaff2cd81c5deea9ec3fe Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Fri, 5 May 2023 18:48:13 +0100 Subject: [PATCH 8/9] document jsonlines batcher --- docs/classes/singer_sdk.batch.JSONLinesBatcher.rst | 10 ++++++++++ docs/reference.rst | 9 +++++++++ 2 files changed, 19 insertions(+) create mode 100644 docs/classes/singer_sdk.batch.JSONLinesBatcher.rst diff --git a/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst b/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst new file mode 100644 index 000000000..d605b2e74 --- /dev/null +++ b/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst @@ -0,0 +1,10 @@ +singer_sdk.batch.JSONLinesBatcher +================================= + +.. currentmodule:: singer_sdk.batch + +.. autoclass:: JSONLinesBatcher + :members: + :show-inheritance: + :inherited-members: + :special-members: __init__ diff --git a/docs/reference.rst b/docs/reference.rst index 276a96d80..b9cf55fa9 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -130,3 +130,12 @@ Pagination pagination.BaseOffsetPaginator pagination.LegacyPaginatedStreamProtocol pagination.LegacyStreamPaginator + +Batch +----- + +.. autosummary:: + :toctree: classes + :template: class.rst + + batch.JSONLineBatcher From 37d1262d720f0311b1b9be85799900bca8133604 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Fri, 5 May 2023 18:53:13 +0100 Subject: [PATCH 9/9] autodocs --- docs/classes/singer_sdk.batch.BaseBatcher.rst | 8 ++++++++ docs/classes/singer_sdk.batch.JSONLinesBatcher.rst | 6 ++---- docs/reference.rst | 3 ++- 3 files changed, 12 insertions(+), 5 deletions(-) create mode 100644 docs/classes/singer_sdk.batch.BaseBatcher.rst diff --git a/docs/classes/singer_sdk.batch.BaseBatcher.rst b/docs/classes/singer_sdk.batch.BaseBatcher.rst new file mode 100644 index 000000000..4b2588355 --- /dev/null +++ b/docs/classes/singer_sdk.batch.BaseBatcher.rst @@ -0,0 +1,8 @@ +singer_sdk.batch.BaseBatcher +============================ + +.. currentmodule:: singer_sdk.batch + +.. autoclass:: BaseBatcher + :members: + :special-members: __init__, __call__ \ No newline at end of file diff --git a/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst b/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst index d605b2e74..e03fa7e07 100644 --- a/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst +++ b/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst @@ -1,10 +1,8 @@ -singer_sdk.batch.JSONLinesBatcher +singer_sdk.batch.JSONLinesBatcher ================================= .. currentmodule:: singer_sdk.batch .. autoclass:: JSONLinesBatcher :members: - :show-inheritance: - :inherited-members: - :special-members: __init__ + :special-members: __init__, __call__ \ No newline at end of file diff --git a/docs/reference.rst b/docs/reference.rst index b9cf55fa9..0c8d8dff3 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -138,4 +138,5 @@ Batch :toctree: classes :template: class.rst - batch.JSONLineBatcher + batch.BaseBatcher + batch.JSONLinesBatcher