From e029e30bd9bde5b777e8c67f3f47cc276b72c824 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Fri, 5 May 2023 20:53:23 +0100 Subject: [PATCH] refactor: break out default batch file writer into separate class (#1668) --- docs/classes/singer_sdk.batch.BaseBatcher.rst | 8 ++ .../singer_sdk.batch.JSONLinesBatcher.rst | 8 ++ docs/reference.rst | 10 ++ singer_sdk/batch.py | 110 ++++++++++++++++++ singer_sdk/helpers/_batch.py | 8 ++ singer_sdk/streams/core.py | 61 ++-------- 6 files changed, 153 insertions(+), 52 deletions(-) create mode 100644 docs/classes/singer_sdk.batch.BaseBatcher.rst create mode 100644 docs/classes/singer_sdk.batch.JSONLinesBatcher.rst create mode 100644 singer_sdk/batch.py diff --git a/docs/classes/singer_sdk.batch.BaseBatcher.rst b/docs/classes/singer_sdk.batch.BaseBatcher.rst new file mode 100644 index 000000000..4b2588355 --- /dev/null +++ b/docs/classes/singer_sdk.batch.BaseBatcher.rst @@ -0,0 +1,8 @@ +singer_sdk.batch.BaseBatcher +============================ + +.. currentmodule:: singer_sdk.batch + +.. autoclass:: BaseBatcher + :members: + :special-members: __init__, __call__ \ No newline at end of file diff --git a/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst b/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst new file mode 100644 index 000000000..e03fa7e07 --- /dev/null +++ b/docs/classes/singer_sdk.batch.JSONLinesBatcher.rst @@ -0,0 +1,8 @@ +singer_sdk.batch.JSONLinesBatcher +================================= + +.. currentmodule:: singer_sdk.batch + +.. autoclass:: JSONLinesBatcher + :members: + :special-members: __init__, __call__ \ No newline at end of file diff --git a/docs/reference.rst b/docs/reference.rst index 276a96d80..0c8d8dff3 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -130,3 +130,13 @@ Pagination pagination.BaseOffsetPaginator pagination.LegacyPaginatedStreamProtocol pagination.LegacyStreamPaginator + +Batch +----- + +.. autosummary:: + :toctree: classes + :template: class.rst + + batch.BaseBatcher + batch.JSONLinesBatcher diff --git a/singer_sdk/batch.py b/singer_sdk/batch.py new file mode 100644 index 000000000..c321ebdd1 --- /dev/null +++ b/singer_sdk/batch.py @@ -0,0 +1,110 @@ +"""Batching utilities for Singer SDK.""" +from __future__ import annotations + +import gzip +import itertools +import json +import typing as t +from abc import ABC, abstractmethod +from uuid import uuid4 + +if t.TYPE_CHECKING: + from singer_sdk.helpers._batch import BatchConfig + +_T = t.TypeVar("_T") + + +def lazy_chunked_generator( + iterable: t.Iterable[_T], + chunk_size: int, +) -> t.Generator[t.Iterator[_T], None, None]: + """Yield a generator for each chunk of the given iterable. + + Args: + iterable: The iterable to chunk. + chunk_size: The size of each chunk. + + Yields: + A generator for each chunk of the given iterable. + """ + iterator = iter(iterable) + while True: + chunk = list(itertools.islice(iterator, chunk_size)) + if not chunk: + break + yield iter(chunk) + + +class BaseBatcher(ABC): + """Base Record Batcher.""" + + def __init__( + self, + tap_name: str, + stream_name: str, + batch_config: BatchConfig, + ) -> None: + """Initialize the batcher. + + Args: + tap_name: The name of the tap. + stream_name: The name of the stream. + batch_config: The batch configuration. + """ + self.tap_name = tap_name + self.stream_name = stream_name + self.batch_config = batch_config + + @abstractmethod + def get_batches( + self, + records: t.Iterator[dict], + ) -> t.Iterator[list[str]]: + """Yield manifest of batches. + + Args: + records: The records to batch. + + Raises: + NotImplementedError: If the method is not implemented. + """ + raise NotImplementedError + + +class JSONLinesBatcher(BaseBatcher): + """JSON Lines Record Batcher.""" + + def get_batches( + self, + records: t.Iterator[dict], + ) -> t.Iterator[list[str]]: + """Yield manifest of batches. + + Args: + records: The records to batch. + + Yields: + A list of file paths (called a manifest). + """ + sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}" + prefix = self.batch_config.storage.prefix or "" + + for i, chunk in enumerate( + lazy_chunked_generator( + records, + self.batch_config.batch_size, + ), + start=1, + ): + filename = f"{prefix}{sync_id}-{i}.json.gz" + with self.batch_config.storage.fs() as fs: + # TODO: Determine compression from config. + with fs.open(filename, "wb") as f, gzip.GzipFile( + fileobj=f, + mode="wb", + ) as gz: + gz.writelines( + (json.dumps(record) + "\n").encode() for record in chunk + ) + file_url = fs.geturl(filename) + yield [file_url] diff --git a/singer_sdk/helpers/_batch.py b/singer_sdk/helpers/_batch.py index 41ce7de43..62447ddb3 100644 --- a/singer_sdk/helpers/_batch.py +++ b/singer_sdk/helpers/_batch.py @@ -16,6 +16,8 @@ if t.TYPE_CHECKING: from fs.base import FS +DEFAULT_BATCH_SIZE = 10000 + class BatchFileFormat(str, enum.Enum): """Batch file format.""" @@ -209,6 +211,9 @@ class BatchConfig: storage: StorageTarget """The storage target of the batch file.""" + batch_size: int = DEFAULT_BATCH_SIZE + """The max number of records in a batch.""" + def __post_init__(self): if isinstance(self.encoding, dict): self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) @@ -216,6 +221,9 @@ def __post_init__(self): if isinstance(self.storage, dict): self.storage = StorageTarget.from_dict(self.storage) + if self.batch_size is None: + self.batch_size = DEFAULT_BATCH_SIZE + def asdict(self): """Return a dictionary representation of the message. diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 3b7e4120f..e6cfeefc2 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -5,19 +5,17 @@ import abc import copy import datetime -import gzip -import itertools import json import typing as t from os import PathLike from pathlib import Path from types import MappingProxyType -from uuid import uuid4 import pendulum import singer_sdk._singerlib as singer from singer_sdk import metrics +from singer_sdk.batch import JSONLinesBatcher from singer_sdk.exceptions import ( AbortedSyncFailedException, AbortedSyncPausedException, @@ -63,28 +61,6 @@ REPLICATION_LOG_BASED = "LOG_BASED" FactoryType = t.TypeVar("FactoryType", bound="Stream") -_T = t.TypeVar("_T") - - -def lazy_chunked_generator( - iterable: t.Iterable[_T], - chunk_size: int, -) -> t.Generator[t.Iterator[_T], None, None]: - """Yield a generator for each chunk of the given iterable. - - Args: - iterable: The iterable to chunk. - chunk_size: The size of each chunk. - - Yields: - A generator for each chunk of the given iterable. - """ - iterator = iter(iterable) - while True: - chunk = list(itertools.islice(iterator, chunk_size)) - if not chunk: - break - yield iter(chunk) class Stream(metaclass=abc.ABCMeta): @@ -124,10 +100,6 @@ class Stream(metaclass=abc.ABCMeta): # Internal API cost aggregator _sync_costs: dict[str, int] = {} - # Batch attributes - batch_size: int = 1000 - """Max number of records to write to each batch file.""" - def __init__( self, tap: Tap, @@ -1341,29 +1313,14 @@ def get_batches( Yields: A tuple of (encoding, manifest) for each batch. """ - sync_id = f"{self.tap_name}--{self.name}-{uuid4()}" - prefix = batch_config.storage.prefix or "" - - for i, chunk in enumerate( - lazy_chunked_generator( - self._sync_records(context, write_messages=False), - self.batch_size, - ), - start=1, - ): - filename = f"{prefix}{sync_id}-{i}.json.gz" - with batch_config.storage.fs() as fs: - # TODO: Determine compression from config. - with fs.open(filename, "wb") as f, gzip.GzipFile( - fileobj=f, - mode="wb", - ) as gz: - gz.writelines( - (json.dumps(record) + "\n").encode() for record in chunk - ) - file_url = fs.geturl(filename) - - yield batch_config.encoding, [file_url] + batcher = JSONLinesBatcher( + tap_name=self.tap_name, + stream_name=self.name, + batch_config=batch_config, + ) + records = self._sync_records(context, write_messages=False) + for manifest in batcher.get_batches(records=records): + yield batch_config.encoding, manifest def post_process( self,