diff --git a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/README.md b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/README.md index f727a3de9..37c94d0fd 100644 --- a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/README.md +++ b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/README.md @@ -4,19 +4,39 @@ Built with the [Meltano Tap SDK](https://sdk.meltano.com) for Singer Taps. + + ## Configuration ### Accepted Config Options -- [ ] `Developer TODO:` Provide a list of config options accepted by the tap. + A full list of supported settings and capabilities for this tap is available by running: @@ -33,7 +53,9 @@ environment variable is set either in the terminal context or in the `.env` file ### Source Authentication and Authorization -- [ ] `Developer TODO:` If your tap requires special access on the source system, or any special authentication requirements, provide those here. + ## Usage @@ -49,7 +71,7 @@ You can easily run `{{ cookiecutter.tap_id }}` by itself or in a pipeline using ## Developer Resources -- [ ] `Developer TODO:` As a first step, scan the entire project for the text "`TODO:`" and complete any recommended steps, deleting the "TODO" references once completed. +Follow these instructions to contribute to this project. ### Initialize your Development Environment @@ -78,8 +100,11 @@ poetry run {{cookiecutter.tap_id}} --help _**Note:** This tap will work in any Singer environment and does not require Meltano. Examples here are for convenience and to streamline end-to-end orchestration scenarios._ -Your project comes with a custom `meltano.yml` project file already created. Open the `meltano.yml` and follow any _"TODO"_ items listed in + Next, install Meltano (if you haven't already) and any needed plugins: diff --git a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'GraphQL' == cookiecutter.stream_type %}client.py{%endif%} b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'GraphQL' == cookiecutter.stream_type %}client.py{%endif%} index f96869d44..9034597a9 100644 --- a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'GraphQL' == cookiecutter.stream_type %}client.py{%endif%} +++ b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'GraphQL' == cookiecutter.stream_type %}client.py{%endif%} @@ -43,11 +43,11 @@ class {{ cookiecutter.source_name }}Stream({{ cookiecutter.stream_type }}Stream) return headers def parse_response(self, response: requests.Response) -> Iterable[dict]: - """Parse the response and return an iterator of result rows.""" + """Parse the response and return an iterator of result records.""" # TODO: Parse response body and return a set of records. resp_json = response.json() - for row in resp_json.get(""): - yield row + for record in resp_json.get(""): + yield record def post_process(self, row: dict, context: Optional[dict] = None) -> dict: """As needed, append or transform raw data to match expected structure.""" diff --git a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'Other' == cookiecutter.stream_type %}client.py{%endif%} b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'Other' == cookiecutter.stream_type %}client.py{%endif%} index 27d8be064..8d6b118f6 100644 --- a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'Other' == cookiecutter.stream_type %}client.py{%endif%} +++ b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'Other' == cookiecutter.stream_type %}client.py{%endif%} @@ -11,14 +11,14 @@ class {{ cookiecutter.source_name }}Stream(Stream): """Stream class for {{ cookiecutter.source_name }} streams.""" def get_records(self, context: Optional[dict]) -> Iterable[dict]: - """Return a generator of row-type dictionary objects. + """Return a generator of record-type dictionary objects. The optional `context` argument is used to identify a specific slice of the stream if partitioning is required for the stream. Most implementations do not require partitioning and should ignore the `context` argument. """ # TODO: Write logic to extract data from the upstream source. - # rows = mysource.getall() - # for row in rows: - # yield row.to_dict() + # records = mysource.getall() + # for record in records: + # yield record.to_dict() raise NotImplementedError("The method is not yet implemented (TODO)") diff --git a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'REST' == cookiecutter.stream_type %}client.py{%endif%} b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'REST' == cookiecutter.stream_type %}client.py{%endif%} index b6a393749..c70be712f 100644 --- a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'REST' == cookiecutter.stream_type %}client.py{%endif%} +++ b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'REST' == cookiecutter.stream_type %}client.py{%endif%} @@ -134,7 +134,7 @@ class {{ cookiecutter.source_name }}Stream({{ cookiecutter.stream_type }}Stream) return None def parse_response(self, response: requests.Response) -> Iterable[dict]: - """Parse the response and return an iterator of result rows.""" + """Parse the response and return an iterator of result records.""" # TODO: Parse response body and return a set of records. yield from extract_jsonpath(self.records_jsonpath, input=response.json()) diff --git a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'SQL' == cookiecutter.stream_type %}client.py{%endif%} b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'SQL' == cookiecutter.stream_type %}client.py{%endif%} index b1e131cc8..6217b9015 100644 --- a/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'SQL' == cookiecutter.stream_type %}client.py{%endif%} +++ b/cookiecutter/tap-template/{{cookiecutter.tap_id}}/{{cookiecutter.library_name}}/{%if 'SQL' == cookiecutter.stream_type %}client.py{%endif%} @@ -51,7 +51,7 @@ class {{ cookiecutter.source_name }}Stream(SQLStream): connector_class = {{ cookiecutter.source_name }}Connector def get_records(self, partition: Optional[dict]) -> Iterable[Dict[str, Any]]: - """Return a generator of row-type dictionary objects. + """Return a generator of record-type dictionary objects. Developers may optionally add custom logic before calling the default implementation inherited from the base class. diff --git a/cookiecutter/target-template/{{cookiecutter.target_id}}/README.md b/cookiecutter/target-template/{{cookiecutter.target_id}}/README.md index 0fb9f105e..745d45b1f 100644 --- a/cookiecutter/target-template/{{cookiecutter.target_id}}/README.md +++ b/cookiecutter/target-template/{{cookiecutter.target_id}}/README.md @@ -4,19 +4,39 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com). + + ## Configuration ### Accepted Config Options -- [ ] `Developer TODO:` Provide a list of config options accepted by the target. + A full list of supported settings and capabilities for this target is available by running: @@ -33,7 +53,9 @@ environment variable is set either in the terminal context or in the `.env` file ### Source Authentication and Authorization -- [ ] `Developer TODO:` If your target requires special access on the source system, or any special authentication requirements, provide those here. + ## Usage @@ -50,7 +72,7 @@ tap-carbon-intensity | {{ cookiecutter.target_id }} --config /path/to/{{ cookiec ## Developer Resources -- [ ] `Developer TODO:` As a first step, scan the entire project for the text "`TODO:`" and complete any recommended steps, deleting the "TODO" references once completed. +Follow these instructions to contribute to this project. ### Initialize your Development Environment @@ -79,8 +101,11 @@ poetry run {{cookiecutter.target_id}} --help _**Note:** This target will work in any Singer environment and does not require Meltano. Examples here are for convenience and to streamline end-to-end orchestration scenarios._ -Your project comes with a custom `meltano.yml` project file already created. Open the `meltano.yml` and follow any _"TODO"_ items listed in + Next, install Meltano (if you haven't already) and any needed plugins: diff --git a/docs/batch.md b/docs/batch.md new file mode 100644 index 000000000..77ca8ee7e --- /dev/null +++ b/docs/batch.md @@ -0,0 +1,95 @@ +# Batch Messages + +```{warning} +The `BATCH` message functionality is currently in preview and is subject to change. +You can [open an issue](https://github.com/meltano/sdk/issues) or [join the discussion](https://github.com/meltano/sdk/discussions/963) on GitHub to provide feedback during the preview period. +``` + +[The Singer message specification](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#output) defines the three basic types of messages: `RECORD`, `STATE`, and `SCHEMA`. The `RECORD` message is used to send data from the tap to the target. The `STATE` message is used to send state data from the tap to the target. The `SCHEMA` message is used to send schema data from the tap to the target, and for example, create tables with the correct column types. + +However, the Singer specification can be extended to support additional types of messages. For example, the [`ACTIVATE_VERSION`](https://sdk.meltano.com/en/latest/capabilities.html#singer_sdk.helpers.capabilities.PluginCapabilities.ACTIVATE_VERSION) message is used to manage hard deletes in the target. + +This library's implementation of the `BATCH` message is used to send records in bulk from the tap to the target, using an intermediate filesystem to store _batch_ files. This is useful, for example + +- when the tap outputs records at a much higher rate than the target can consume them, creating backpressure +- when the source system can directly export data in bulk (e.g. a database dump) + +Currently only a local filesystem is supported, but other filesystems like AWS S3, FTP, etc. could be supported in the future. + +## The `BATCH` Message + +```json +{ + "type": "BATCH", + "stream": "users", + "encoding": { + "format": "jsonl", + "compression": "gzip" + }, + "manifest": [ + "file://path/to/batch/file/1", + "file://path/to/batch/file/2" + ] +} +``` + +### `encoding` + +The `encoding` field is used to specify the format and compression of the batch files. Currently only `jsonl` and `gzip` are supported, respectively. + +### `manifest` + +The `manifest` field is used to specify the paths to the batch files. The paths are relative to the `root` directory specified in the [`batch_config`](#batch-configuration) storage configuration. + +## Batch configuration + +When local storage is used, targets do no require special configuration to process `BATCH` messages. + +Taps may be configured to specify a root storage `root` directory, file path `prefix`, and `encoding` for batch files using a configuration like the below: + + +In `config.json`: + +```js +{ + // ... + "batch_config": { + "encoding": { + "format": "jsonl", + "compression": "gzip", + }, + "storage": { + "root": "file://tests/core/resources", + "prefix": "test-batch-", + } + } +} +``` + +## Custom batch file creation and processing + +### Tap side + +Taps can optionally customize the batch file creation by implementing the [`get_batches`](singer_sdk.Stream.get_batches). This method should return a _tuple_ of an encoding and a list of batch files: + +```python +class MyStream(Stream): + def get_batches(self, records): + return ( + ParquetEncoding(compression="snappy"), + [ + "s3://my-bucket/my-batch-file-1.parquet", + "s3://my-bucket/my-batch-file-2.parquet", + ] + ) +``` + +### Target side + +Targets can optionally customize the batch file processing by implementing the [`process_batch_files`](singer_sdk.Sink.process_batch_files). + +```python +class MySink(Sink): + def process_batch_files(self, encoding, storage, files): + # process the batch files +``` diff --git a/docs/index.rst b/docs/index.rst index 19eed21ef..3b0baa77e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -64,6 +64,7 @@ Advanced Topics parent_streams partitioning stream_maps + batch porting sinks CONTRIBUTING diff --git a/poetry.lock b/poetry.lock index d1665b2dd..a3b380eda 100644 --- a/poetry.lock +++ b/poetry.lock @@ -6,6 +6,14 @@ category = "main" optional = true python-versions = "*" +[[package]] +name = "appdirs" +version = "1.4.4" +description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." +category = "main" +optional = false +python-versions = "*" + [[package]] name = "arrow" version = "1.2.2" @@ -212,7 +220,7 @@ python-versions = ">=3.6" cffi = ">=1.12" [package.extras] -docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx-rtd-theme"] +docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx_rtd_theme"] docstest = ["pyenchant (>=1.6.11)", "sphinxcontrib-spelling (>=4.0.1)", "twine (>=1.12.0)"] pep8test = ["black", "flake8", "flake8-import-order", "pep8-naming"] sdist = ["setuptools-rust (>=0.11.4)"] @@ -307,6 +315,22 @@ python-versions = ">=3.6" [package.dependencies] python-dateutil = ">=2.7" +[[package]] +name = "fs" +version = "2.4.16" +description = "Python's filesystem abstraction layer" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +appdirs = ">=1.4.3,<1.5.0" +setuptools = "*" +six = ">=1.10,<2.0" + +[package.extras] +scandir = ["scandir (>=1.5,<2.0)"] + [[package]] name = "greenlet" version = "1.1.2" @@ -1262,8 +1286,8 @@ all = ["IPython", "IPython", "Pygments", "Pygments", "attrs", "cmake", "codecov" all-strict = ["IPython (==7.10.0)", "IPython (==7.23.1)", "Pygments (==2.0.0)", "Pygments (==2.4.1)", "attrs (==19.2.0)", "cmake (==3.21.2)", "codecov (==2.0.15)", "colorama (==0.4.1)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.3.0)", "debugpy (==1.6.0)", "ipykernel (==5.2.0)", "ipykernel (==6.0.0)", "ipython-genutils (==0.2.0)", "jedi (==0.16)", "jinja2 (==3.0.0)", "jupyter-client (==6.1.5)", "jupyter-client (==7.0.0)", "jupyter-core (==4.7.0)", "nbconvert (==6.0.0)", "ninja (==1.10.2)", "pybind11 (==2.7.1)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==6.2.5)", "pytest-cov (==2.8.1)", "pytest-cov (==2.8.1)", "pytest-cov (==2.9.0)", "pytest-cov (==3.0.0)", "scikit-build (==0.11.1)", "six (==1.11.0)", "typing (==3.7.4)"] colors = ["Pygments", "Pygments", "colorama"] jupyter = ["IPython", "IPython", "attrs", "debugpy", "debugpy", "debugpy", "debugpy", "debugpy", "ipykernel", "ipykernel", "ipython-genutils", "jedi", "jinja2", "jupyter-client", "jupyter-client", "jupyter-core", "nbconvert"] -optional = ["IPython", "IPython", "Pygments", "Pygments", "attrs", "colorama", "debugpy", "debugpy", "debugpy", "debugpy", "debugpy", "ipykernel", "ipykernel", "ipython-genutils", "jedi", "jinja2", "jupyter-client", "jupyter-client", "jupyter-core", "nbconvert", "pyflakes", "tomli"] -optional-strict = ["IPython (==7.10.0)", "IPython (==7.23.1)", "Pygments (==2.0.0)", "Pygments (==2.4.1)", "attrs (==19.2.0)", "colorama (==0.4.1)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.3.0)", "debugpy (==1.6.0)", "ipykernel (==5.2.0)", "ipykernel (==6.0.0)", "ipython-genutils (==0.2.0)", "jedi (==0.16)", "jinja2 (==3.0.0)", "jupyter-client (==6.1.5)", "jupyter-client (==7.0.0)", "jupyter-core (==4.7.0)", "nbconvert (==6.0.0)", "pyflakes (==2.2.0)", "tomli (==0.2.0)"] +optional = ["IPython", "IPython", "Pygments", "Pygments", "attrs", "colorama", "debugpy", "debugpy", "debugpy", "debugpy", "debugpy", "ipykernel", "ipykernel", "ipython-genutils", "jedi", "jinja2", "jupyter-client", "jupyter-client", "jupyter-core", "nbconvert", "tomli"] +optional-strict = ["IPython (==7.10.0)", "IPython (==7.23.1)", "Pygments (==2.0.0)", "Pygments (==2.4.1)", "attrs (==19.2.0)", "colorama (==0.4.1)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.3.0)", "debugpy (==1.6.0)", "ipykernel (==5.2.0)", "ipykernel (==6.0.0)", "ipython-genutils (==0.2.0)", "jedi (==0.16)", "jinja2 (==3.0.0)", "jupyter-client (==6.1.5)", "jupyter-client (==7.0.0)", "jupyter-core (==4.7.0)", "nbconvert (==6.0.0)", "tomli (==0.2.0)"] runtime-strict = ["six (==1.11.0)"] tests = ["cmake", "codecov", "ninja", "pybind11", "pytest", "pytest", "pytest", "pytest", "pytest", "pytest", "pytest-cov", "pytest-cov", "pytest-cov", "pytest-cov", "scikit-build", "typing"] tests-strict = ["cmake (==3.21.2)", "codecov (==2.0.15)", "ninja (==1.10.2)", "pybind11 (==2.7.1)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==6.2.5)", "pytest-cov (==2.8.1)", "pytest-cov (==2.8.1)", "pytest-cov (==2.9.0)", "pytest-cov (==3.0.0)", "scikit-build (==0.11.1)", "typing (==3.7.4)"] @@ -1286,13 +1310,17 @@ docs = ["sphinx", "sphinx-rtd-theme", "sphinx-copybutton", "myst-parser", "sphin [metadata] lock-version = "1.1" python-versions = "<3.11,>=3.7.1" -content-hash = "13b5614e8ae831dfb3f6d9bb07ab8a43c16954be79487c55ddfa58098ee06d91" +content-hash = "61985ae9e6f47d8b374106f977297a42233fcaeb9fbc8dfb6a29f7ee4917b13c" [metadata.files] alabaster = [ {file = "alabaster-0.7.12-py2.py3-none-any.whl", hash = "sha256:446438bdcca0e05bd45ea2de1668c1d9b032e1a9154c2c259092d77031ddd359"}, {file = "alabaster-0.7.12.tar.gz", hash = "sha256:a661d72d58e6ea8a57f7a86e37d86716863ee5e92788398526d58b26a4e4dc02"}, ] +appdirs = [ + {file = "appdirs-1.4.4-py2.py3-none-any.whl", hash = "sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128"}, + {file = "appdirs-1.4.4.tar.gz", hash = "sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41"}, +] arrow = [ {file = "arrow-1.2.2-py3-none-any.whl", hash = "sha256:d622c46ca681b5b3e3574fcb60a04e5cc81b9625112d5fb2b44220c36c892177"}, {file = "arrow-1.2.2.tar.gz", hash = "sha256:05caf1fd3d9a11a1135b2b6f09887421153b94558e5ef4d090b567b47173ac2b"}, @@ -1544,6 +1572,10 @@ freezegun = [ {file = "freezegun-1.2.2-py3-none-any.whl", hash = "sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f"}, {file = "freezegun-1.2.2.tar.gz", hash = "sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446"}, ] +fs = [ + {file = "fs-2.4.16-py2.py3-none-any.whl", hash = "sha256:660064febbccda264ae0b6bace80a8d1be9e089e0a5eb2427b7d517f9a91545c"}, + {file = "fs-2.4.16.tar.gz", hash = "sha256:ae97c7d51213f4b70b6a958292530289090de3a7e15841e108fbe144f069d313"}, +] greenlet = [ {file = "greenlet-1.1.2-cp27-cp27m-macosx_10_14_x86_64.whl", hash = "sha256:58df5c2a0e293bf665a51f8a100d3e9956febfbf1d9aaf8c0677cf70218910c6"}, {file = "greenlet-1.1.2-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:aec52725173bd3a7b56fe91bc56eccb26fbdff1386ef123abb63c84c5b43b63a"}, diff --git a/pyproject.toml b/pyproject.toml index 759ba85b1..1d4f4ab2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ pipelinewise-singer-python = "1.2.0" backoff = ">=1.8.0,<2.0" pendulum = "^2.1.0" click = "~=8.0" +fs = "^2.4.16" PyJWT = "~=2.4" requests = "^2.25.1" cryptography = ">=3.4.6,<39.0.0" @@ -148,6 +149,7 @@ exclude_lines = [ "if __name__ == .__main__.:", '''class .*\bProtocol\):''', '''@(abc\.)?abstractmethod''', + "if TYPE_CHECKING:", ] fail_under = 82 diff --git a/samples/sample_tap_countries/countries_tap.py b/samples/sample_tap_countries/countries_tap.py index 842686117..f2ff836c2 100644 --- a/samples/sample_tap_countries/countries_tap.py +++ b/samples/sample_tap_countries/countries_tap.py @@ -28,3 +28,7 @@ def discover_streams(self) -> List[Stream]: CountriesStream(tap=self), ContinentsStream(tap=self), ] + + +if __name__ == "__main__": + SampleTapCountries.cli() diff --git a/samples/sample_target_csv/csv_target_sink.py b/samples/sample_target_csv/csv_target_sink.py index f086321d9..57a9aadd3 100644 --- a/samples/sample_target_csv/csv_target_sink.py +++ b/samples/sample_target_csv/csv_target_sink.py @@ -39,7 +39,7 @@ def process_batch(self, context: dict) -> None: ) for record in records_to_drain: if newfile and not records_written: - # Write header row if new file + # Write header line if new file writer.writerow(record.keys()) writer.writerow(record.values()) records_written += 1 diff --git a/samples/sample_target_sqlite/__init__.py b/samples/sample_target_sqlite/__init__.py index d4e4372bd..11843a101 100644 --- a/samples/sample_target_sqlite/__init__.py +++ b/samples/sample_target_sqlite/__init__.py @@ -1,8 +1,11 @@ """A sample implementation for SQLite.""" -from typing import Any, Dict +from __future__ import annotations + +from typing import Any import sqlalchemy +from sqlalchemy.dialects.sqlite import insert from singer_sdk import SQLConnector, SQLSink, SQLTarget from singer_sdk import typing as th @@ -20,7 +23,7 @@ class SQLiteConnector(SQLConnector): allow_column_alter = False allow_merge_upsert = True - def get_sqlalchemy_url(self, config: Dict[str, Any]) -> str: + def get_sqlalchemy_url(self, config: dict[str, Any]) -> str: """Generates a SQLAlchemy URL for SQLite.""" return f"sqlite:///{config[DB_PATH_CONFIG]}" diff --git a/singer_sdk/helpers/_batch.py b/singer_sdk/helpers/_batch.py new file mode 100644 index 000000000..161e0097c --- /dev/null +++ b/singer_sdk/helpers/_batch.py @@ -0,0 +1,244 @@ +"""Batch helpers.""" + +from __future__ import annotations + +import enum +import sys +from contextlib import contextmanager +from dataclasses import asdict, dataclass, field +from typing import IO, TYPE_CHECKING, Any, ClassVar, Generator +from urllib.parse import ParseResult, parse_qs, urlencode, urlparse + +import fs +from singer.messages import Message + +from singer_sdk.helpers._singer import SingerMessageType + +if TYPE_CHECKING: + from fs.base import FS + + if sys.version_info >= (3, 8): + from typing import Literal + else: + from typing_extensions import Literal + + +class BatchFileFormat(str, enum.Enum): + """Batch file format.""" + + JSONL = "jsonl" + """JSON Lines format.""" + + +@dataclass +class BaseBatchFileEncoding: + """Base class for batch file encodings.""" + + registered_encodings: ClassVar[dict[str, type[BaseBatchFileEncoding]]] = {} + __encoding_format__: ClassVar[str] = "OVERRIDE_ME" + + # Base encoding fields + format: str = field(init=False) + """The format of the batch file.""" + + compression: str | None = None + """The compression of the batch file.""" + + def __init_subclass__(cls, **kwargs: Any) -> None: + """Register subclasses. + + Args: + **kwargs: Keyword arguments. + """ + super().__init_subclass__(**kwargs) + cls.registered_encodings[cls.__encoding_format__] = cls + + def __post_init__(self) -> None: + """Post-init hook.""" + self.format = self.__encoding_format__ + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> BaseBatchFileEncoding: + """Create an encoding from a dictionary.""" + data = data.copy() + encoding_format = data.pop("format") + encoding_cls = cls.registered_encodings[encoding_format] + return encoding_cls(**data) + + +@dataclass +class JSONLinesEncoding(BaseBatchFileEncoding): + """JSON Lines encoding for batch files.""" + + __encoding_format__ = "jsonl" + + +@dataclass +class SDKBatchMessage(Message): + """Singer batch message in the Meltano SDK flavor.""" + + type: Literal[SingerMessageType.BATCH] = field(init=False) + """The message type.""" + + stream: str + """The stream name.""" + + encoding: BaseBatchFileEncoding + """The file encoding of the batch.""" + + manifest: list[str] = field(default_factory=list) + """The manifest of files in the batch.""" + + def __post_init__(self): + if isinstance(self.encoding, dict): + self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) + + self.type = SingerMessageType.BATCH + + def asdict(self): + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> SDKBatchMessage: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + data.pop("type") + return cls(**data) + + +@dataclass +class StorageTarget: + """Storage target.""" + + root: str + """"The root directory of the storage target.""" + + prefix: str | None = None + """"The file prefix.""" + + params: dict = field(default_factory=dict) + """"The storage parameters.""" + + def asdict(self): + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> StorageTarget: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + return cls(**data) + + @classmethod + def from_url(cls, url: ParseResult) -> StorageTarget: + """Create a storage target from a URL. + + Args: + url: The URL to create the storage target from. + + Returns: + The created storage target. + """ + new_url = url._replace(path="", query="") + return cls(root=new_url.geturl(), params=parse_qs(url.query)) + + @property + def fs_url(self) -> ParseResult: + """Get the storage target URL. + + Returns: + The storage target URL. + """ + return urlparse(self.root)._replace(query=urlencode(self.params)) + + @contextmanager + def fs(self, **kwargs: Any) -> Generator[FS, None, None]: + """Get a filesystem object for the storage target. + + Args: + kwargs: Additional arguments to pass ``f`.open_fs``. + + Returns: + The filesystem object. + """ + filesystem = fs.open_fs(self.fs_url.geturl(), **kwargs) + yield filesystem + filesystem.close() + + @contextmanager + def open(self, filename: str, mode: str = "rb") -> Generator[IO, None, None]: + """Open a file in the storage target. + + Args: + filename: The filename to open. + mode: The mode to open the file in. + + Returns: + The opened file. + """ + filesystem = fs.open_fs(self.root, writeable=True, create=True) + fo = filesystem.open(filename, mode=mode) + try: + yield fo + finally: + fo.close() + filesystem.close() + + +@dataclass +class BatchConfig: + """Batch configuration.""" + + encoding: BaseBatchFileEncoding + """The encoding of the batch file.""" + + storage: StorageTarget + """The storage target of the batch file.""" + + def __post_init__(self): + if isinstance(self.encoding, dict): + self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) + + if isinstance(self.storage, dict): + self.storage = StorageTarget.from_dict(self.storage) + + def asdict(self): + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> BatchConfig: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + return cls(**data) diff --git a/singer_sdk/helpers/_singer.py b/singer_sdk/helpers/_singer.py index 580491d9e..c756f84a6 100644 --- a/singer_sdk/helpers/_singer.py +++ b/singer_sdk/helpers/_singer.py @@ -1,20 +1,34 @@ from __future__ import annotations +import enum import logging from dataclasses import dataclass, fields -from enum import Enum -from typing import Any, Dict, Iterable, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Dict, Iterable, Tuple, Union, cast from singer.catalog import Catalog as BaseCatalog from singer.catalog import CatalogEntry as BaseCatalogEntry from singer_sdk.helpers._schema import SchemaPlus +if TYPE_CHECKING: + from typing_extensions import TypeAlias + + Breadcrumb = Tuple[str, ...] logger = logging.getLogger(__name__) +class SingerMessageType(str, enum.Enum): + """Singer specification message types.""" + + RECORD = "RECORD" + SCHEMA = "SCHEMA" + STATE = "STATE" + ACTIVATE_VERSION = "ACTIVATE_VERSION" + BATCH = "BATCH" + + class SelectionMask(Dict[Breadcrumb, bool]): """Boolean mask for property selection in schemas and records.""" @@ -35,7 +49,7 @@ def __missing__(self, breadcrumb: Breadcrumb) -> bool: class Metadata: """Base stream or property metadata.""" - class InclusionType(str, Enum): + class InclusionType(str, enum.Enum): """Catalog inclusion types.""" AVAILABLE = "available" @@ -51,8 +65,8 @@ def from_dict(cls, value: dict[str, Any]): """Parse metadata dictionary.""" return cls( **{ - field.name: value.get(field.name.replace("_", "-")) - for field in fields(cls) + object_field.name: value.get(object_field.name.replace("_", "-")) + for object_field in fields(cls) } ) @@ -60,10 +74,10 @@ def to_dict(self) -> dict[str, Any]: """Convert metadata to a JSON-encodeable dictionary.""" result = {} - for field in fields(self): - value = getattr(self, field.name) + for object_field in fields(self): + value = getattr(self, object_field.name) if value is not None: - result[field.name.replace("_", "-")] = value + result[object_field.name.replace("_", "-")] = value return result @@ -78,13 +92,16 @@ class StreamMetadata(Metadata): schema_name: str | None = None -class MetadataMapping(Dict[Breadcrumb, Union[Metadata, StreamMetadata]]): +AnyMetadata: TypeAlias = Union[Metadata, StreamMetadata] + + +class MetadataMapping(Dict[Breadcrumb, AnyMetadata]): """Stream metadata mapping.""" @classmethod def from_iterable(cls, iterable: Iterable[dict[str, Any]]): """Create a metadata mapping from an iterable of metadata dictionaries.""" - mapping = cls() + mapping: dict[Breadcrumb, AnyMetadata] = cls() for d in iterable: breadcrumb = tuple(d["breadcrumb"]) metadata = d["metadata"] diff --git a/singer_sdk/helpers/_typing.py b/singer_sdk/helpers/_typing.py index db0791d23..dc8389681 100644 --- a/singer_sdk/helpers/_typing.py +++ b/singer_sdk/helpers/_typing.py @@ -183,7 +183,7 @@ def _warn_unmapped_properties( def conform_record_data_types( # noqa: C901 - stream_name: str, row: Dict[str, Any], schema: dict, logger: logging.Logger + stream_name: str, record: Dict[str, Any], schema: dict, logger: logging.Logger ) -> Dict[str, Any]: """Translate values in record dictionary to singer-compatible data types. @@ -192,7 +192,7 @@ def conform_record_data_types( # noqa: C901 """ rec: Dict[str, Any] = {} unmapped_properties: List[str] = [] - for property_name, elem in row.items(): + for property_name, elem in record.items(): if property_name not in schema["properties"]: unmapped_properties.append(property_name) continue diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index f5da20d19..32f48ae36 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -3,7 +3,6 @@ from __future__ import annotations import abc -import enum import json import logging import sys @@ -12,19 +11,11 @@ from typing import Counter as CounterType from singer_sdk.helpers._compat import final +from singer_sdk.helpers._singer import SingerMessageType logger = logging.getLogger(__name__) -class SingerMessageType(str, enum.Enum): - """Singer specification message types.""" - - RECORD = "RECORD" - SCHEMA = "SCHEMA" - STATE = "STATE" - ACTIVATE_VERSION = "ACTIVATE_VERSION" - - class SingerReader(metaclass=abc.ABCMeta): """Interface for all plugins reading Singer messages from stdin.""" @@ -95,6 +86,9 @@ def _process_lines(self, file_input: IO[str]) -> CounterType[str]: elif record_type == SingerMessageType.STATE: self._process_state_message(line_dict) + elif record_type == SingerMessageType.BATCH: + self._process_batch_message(line_dict) + else: self._process_unknown_message(line_dict) @@ -118,6 +112,10 @@ def _process_state_message(self, message_dict: dict) -> None: def _process_activate_version_message(self, message_dict: dict) -> None: ... + @abc.abstractmethod + def _process_batch_message(self, message_dict: dict) -> None: + ... + def _process_unknown_message(self, message_dict: dict) -> None: """Internal method to process unknown message types from a Singer tap. diff --git a/singer_sdk/mapper_base.py b/singer_sdk/mapper_base.py index c09d39255..c77a88966 100644 --- a/singer_sdk/mapper_base.py +++ b/singer_sdk/mapper_base.py @@ -50,6 +50,9 @@ def _process_state_message(self, message_dict: dict) -> None: def _process_activate_version_message(self, message_dict: dict) -> None: self._write_messages(self.map_activate_version_message(message_dict)) + def _process_batch_message(self, message_dict: dict) -> None: + self._write_messages(self.map_batch_message(message_dict)) + @abc.abstractmethod def map_schema_message(self, message_dict: dict) -> Iterable[singer.Message]: """Map a schema message to zero or more new messages. @@ -89,6 +92,20 @@ def map_activate_version_message( """ ... + def map_batch_message( + self, + message_dict: dict, + ) -> Iterable[singer.Message]: + """Map a batch message to zero or more new messages. + + Args: + message_dict: A BATCH message JSON dictionary. + + Raises: + NotImplementedError: if not implemented by subclass. + """ + raise NotImplementedError("BATCH messages are not supported by mappers.") + @classproperty def cli(cls) -> Callable: """Execute standard CLI handler for inline mappers. diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 1b5a50f0f..dfd4afce1 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -1,15 +1,27 @@ """Sink classes load data to a target.""" +from __future__ import annotations + import abc import datetime +import json import time +from gzip import GzipFile +from gzip import open as gzip_open from logging import Logger from types import MappingProxyType -from typing import Any, Dict, List, Mapping, Optional, Union +from typing import IO, Any, Mapping, Sequence +from urllib.parse import urlparse from dateutil import parser from jsonschema import Draft4Validator, FormatChecker +from singer_sdk.helpers._batch import ( + BaseBatchFileEncoding, + BatchConfig, + BatchFileFormat, + StorageTarget, +) from singer_sdk.helpers._compat import final from singer_sdk.helpers._typing import ( DatetimeErrorTreatmentEnum, @@ -34,8 +46,8 @@ def __init__( self, target: PluginBase, stream_name: str, - schema: Dict, - key_properties: Optional[List[str]], + schema: dict, + key_properties: list[str] | None, ) -> None: """Initialize target sink. @@ -47,7 +59,7 @@ def __init__( """ self.logger = target.logger self._config = dict(target.config) - self._pending_batch: Optional[dict] = None + self._pending_batch: dict | None = None self.stream_name = stream_name self.logger.info(f"Initializing target sink for stream '{stream_name}'...") self.schema = schema @@ -55,11 +67,11 @@ def __init__( self._add_sdc_metadata_to_schema() else: self._remove_sdc_metadata_from_schema() - self.records_to_drain: Union[List[dict], Any] = [] - self._context_draining: Optional[dict] = None - self.latest_state: Optional[dict] = None - self._draining_state: Optional[dict] = None - self.drained_state: Optional[dict] = None + self.records_to_drain: list[dict] | Any = [] + self._context_draining: dict | None = None + self.latest_state: dict | None = None + self._draining_state: dict | None = None + self.drained_state: dict | None = None self.key_properties = key_properties or [] # Tally counters @@ -163,6 +175,16 @@ def config(self) -> Mapping[str, Any]: """ return MappingProxyType(self._config) + @property + def batch_config(self) -> BatchConfig | None: + """Get batch configuration. + + Returns: + A frozen (read-only) config dictionary map. + """ + raw = self.config.get("batch_config") + return BatchConfig.from_dict(raw) if raw else None + @property def include_sdc_metadata_properties(self) -> bool: """Check if metadata columns should be added. @@ -260,7 +282,7 @@ def _remove_sdc_metadata_from_record(self, record: dict) -> None: # Record validation - def _validate_and_parse(self, record: Dict) -> Dict: + def _validate_and_parse(self, record: dict) -> dict: """Validate or repair the record, parsing to python-native types as needed. Args: @@ -276,7 +298,7 @@ def _validate_and_parse(self, record: Dict) -> Dict: return record def _parse_timestamps_in_record( - self, record: Dict, schema: Dict, treatment: DatetimeErrorTreatmentEnum + self, record: dict, schema: dict, treatment: DatetimeErrorTreatmentEnum ) -> None: """Parse strings to datetime.datetime values, repairing or erroring on failure. @@ -318,7 +340,7 @@ def _after_process_record(self, context: dict) -> None: # SDK developer overrides: - def preprocess_record(self, record: Dict, context: dict) -> dict: + def preprocess_record(self, record: dict, context: dict) -> dict: """Process incoming record and return a modified result. Args: @@ -410,3 +432,40 @@ def clean_up(self) -> None: should not be relied on, it's recommended to use a uuid as well. """ pass + + def process_batch_files( + self, + encoding: BaseBatchFileEncoding, + files: Sequence[str], + ) -> None: + """Process a batch file with the given batch context. + + Args: + encoding: The batch file encoding. + files: The batch files to process. + + Raises: + NotImplementedError: If the batch file encoding is not supported. + """ + file: GzipFile | IO + storage: StorageTarget | None = None + + for path in files: + url = urlparse(path) + + if self.batch_config: + storage = self.batch_config.storage + else: + storage = StorageTarget.from_url(url) + + if encoding.format == BatchFileFormat.JSONL: + with storage.fs(create=False) as fs: + with fs.open(url.path, mode="rb") as file: + if encoding.compression == "gzip": + file = gzip_open(file) + context = {"records": [json.loads(line) for line in file]} + self.process_batch(context) + else: + raise NotImplementedError( + f"Unsupported batch encoding format: {encoding.format}" + ) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 5faaf2ca4..5f37a0236 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -1,10 +1,11 @@ """Sink classes load data to SQL targets.""" from textwrap import dedent -from typing import Any, Dict, Iterable, List, Optional, Type +from typing import Any, Dict, Iterable, List, Optional, Type, Union import sqlalchemy from pendulum import now +from sqlalchemy.sql import Executable from sqlalchemy.sql.expression import bindparam from singer_sdk.plugin_base import PluginBase @@ -167,7 +168,7 @@ def generate_insert_statement( self, full_table_name: str, schema: dict, - ) -> str: + ) -> Union[str, Executable]: """Generate an insert statement for the given records. Args: @@ -213,11 +214,11 @@ def bulk_insert_records( full_table_name, schema, ) + if isinstance(insert_sql, str): + insert_sql = sqlalchemy.text(insert_sql) + self.logger.info("Inserting with SQL: %s", insert_sql) - self.connector.connection.execute( - sqlalchemy.text(insert_sql), - records, - ) + self.connector.connection.execute(insert_sql, records) if isinstance(records, list): return len(records) # If list, we can quickly return record count. diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index c843c0ede..bcc003029 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -5,12 +5,15 @@ import abc import copy import datetime +import gzip +import itertools import json import logging from os import PathLike from pathlib import Path from types import MappingProxyType -from typing import Any, Callable, Generator, Iterable, Mapping, TypeVar, cast +from typing import Any, Callable, Generator, Iterable, Iterator, Mapping, TypeVar, cast +from uuid import uuid4 import pendulum import requests @@ -18,6 +21,11 @@ from singer import RecordMessage, Schema, SchemaMessage, StateMessage from singer_sdk.exceptions import InvalidStreamSortException, MaxRecordsLimitException +from singer_sdk.helpers._batch import ( + BaseBatchFileEncoding, + BatchConfig, + SDKBatchMessage, +) from singer_sdk.helpers._catalog import pop_deselected_record_properties from singer_sdk.helpers._compat import final from singer_sdk.helpers._flattening import get_flattening_options @@ -50,10 +58,32 @@ REPLICATION_LOG_BASED = "LOG_BASED" FactoryType = TypeVar("FactoryType", bound="Stream") +_T = TypeVar("_T") METRICS_LOG_LEVEL_SETTING = "metrics_log_level" +def lazy_chunked_generator( + iterable: Iterable[_T], + chunk_size: int, +) -> Generator[Iterator[_T], None, None]: + """Yield a generator for each chunk of the given iterable. + + Args: + iterable: The iterable to chunk. + chunk_size: The size of each chunk. + + Yields: + A generator for each chunk of the given iterable. + """ + iterator = iter(iterable) + while True: + chunk = list(itertools.islice(iterator, chunk_size)) + if not chunk: + break + yield iter(chunk) + + class Stream(metaclass=abc.ABCMeta): """Abstract base class for tap streams.""" @@ -67,6 +97,10 @@ class Stream(metaclass=abc.ABCMeta): # Internal API cost aggregator _sync_costs: dict[str, int] = {} + # Batch attributes + batch_size: int = 1000 + """Max number of records to write to each batch file.""" + def __init__( self, tap: TapBaseClass, @@ -761,7 +795,7 @@ def _generate_record_messages( pop_deselected_record_properties(record, self.schema, self.mask, self.logger) record = conform_record_data_types( stream_name=self.name, - row=record, + record=record, schema=self.schema, logger=self.logger, ) @@ -787,6 +821,25 @@ def _write_record_message(self, record: dict) -> None: for record_message in self._generate_record_messages(record): singer.write_message(record_message) + def _write_batch_message( + self, + encoding: BaseBatchFileEncoding, + manifest: list[str], + ) -> None: + """Write out a BATCH message. + + Args: + encoding: The encoding to use for the batch. + manifest: A list of filenames for the batch. + """ + singer.write_message( + SDKBatchMessage( + stream=self.name, + encoding=encoding, + manifest=manifest, + ) + ) + @property def _metric_logging_function(self) -> Callable | None: """Return the metrics logging function. @@ -952,16 +1005,48 @@ def finalize_state_progress_markers(self, state: dict | None = None) -> None: # Private sync methods: - def _sync_records( # noqa C901 # too complex - self, context: dict | None = None + def _process_record( + self, + record: dict, + child_context: dict | None = None, + partition_context: dict | None = None, ) -> None: + """Process a record. + + Args: + record: The record to process. + child_context: The child context. + partition_context: The partition context. + """ + partition_context = partition_context or {} + child_context = copy.copy( + self.get_child_context(record=record, context=child_context) + ) + for key, val in partition_context.items(): + # Add state context to records if not already present + if key not in record: + record[key] = val + + # Sync children, except when primary mapper filters out the record + if self.stream_maps[0].get_filter_result(record): + self._sync_children(child_context) + + def _sync_records( + self, + context: dict | None = None, + write_messages: bool = True, + ) -> Generator[dict, Any, Any]: """Sync records, emitting RECORD and STATE messages. Args: context: Stream partition or context dictionary. + write_messages: Whether to write Singer messages to stdout. Raises: InvalidStreamSortException: TODO + + Yields: + Each record from the source. """ record_count = 0 current_context: dict | None @@ -978,44 +1063,47 @@ def _sync_records( # noqa C901 # too complex child_context: dict | None = ( None if current_context is None else copy.copy(current_context) ) + for record_result in self.get_records(current_context): if isinstance(record_result, tuple): # Tuple items should be the record and the child context record, child_context = record_result else: record = record_result - child_context = copy.copy( - self.get_child_context(record=record, context=child_context) - ) - for key, val in (state_partition_context or {}).items(): - # Add state context to records if not already present - if key not in record: - record[key] = val - - # Sync children, except when primary mapper filters out the record - if self.stream_maps[0].get_filter_result(record): - self._sync_children(child_context) + try: + self._process_record( + record, + child_context=child_context, + partition_context=state_partition_context, + ) + except InvalidStreamSortException as ex: + log_sort_error( + log_fn=self.logger.error, + ex=ex, + record_count=record_count + 1, + partition_record_count=partition_record_count + 1, + current_context=current_context, + state_partition_context=state_partition_context, + stream_name=self.name, + ) + raise ex + self._check_max_record_limit(record_count) + if selected: - if (record_count - 1) % self.STATE_MSG_FREQUENCY == 0: + if ( + record_count - 1 + ) % self.STATE_MSG_FREQUENCY == 0 and write_messages: self._write_state_message() - self._write_record_message(record) - try: - self._increment_stream_state(record, context=current_context) - except InvalidStreamSortException as ex: - log_sort_error( - log_fn=self.logger.error, - ex=ex, - record_count=record_count + 1, - partition_record_count=partition_record_count + 1, - current_context=current_context, - state_partition_context=state_partition_context, - stream_name=self.name, - ) - raise ex + if write_messages: + self._write_record_message(record) + self._increment_stream_state(record, context=current_context) + + yield record + + record_count += 1 + partition_record_count += 1 - record_count += 1 - partition_record_count += 1 if current_context == state_partition_context: # Finalize per-partition state only if 1:1 with context finalize_state_progress_markers(state) @@ -1024,8 +1112,25 @@ def _sync_records( # noqa C901 # too complex # Otherwise will be finalized by tap at end of sync. finalize_state_progress_markers(self.stream_state) self._write_record_count_log(record_count=record_count, context=context) - # Reset interim bookmarks before emitting final STATE message: - self._write_state_message() + + if write_messages: + # Reset interim bookmarks before emitting final STATE message: + self._write_state_message() + + def _sync_batches( + self, + batch_config: BatchConfig, + context: dict | None = None, + ) -> None: + """Sync batches, emitting BATCH messages. + + Args: + batch_config: The batch configuration. + context: Stream partition or context dictionary. + """ + for encoding, manifest in self.get_batches(batch_config, context): + self._write_batch_message(encoding=encoding, manifest=manifest) + self._write_state_message() # Public methods ("final", not recommended to be overridden) @@ -1051,8 +1156,14 @@ def sync(self, context: dict | None = None) -> None: # Send a SCHEMA message to the downstream target: if self.selected: self._write_schema_message() - # Sync the records themselves: - self._sync_records(context) + + batch_config = self.get_batch_config(self.config) + if batch_config: + self._sync_batches(batch_config, context=context) + else: + # Sync the records themselves: + for _ in self._sync_records(context=context): + pass def _sync_children(self, child_context: dict) -> None: for child_stream in self.child_streams: @@ -1136,9 +1247,9 @@ def get_child_context(self, record: dict, context: dict | None) -> dict: @abc.abstractmethod def get_records(self, context: dict | None) -> Iterable[dict | tuple[dict, dict]]: - """Abstract row generator function. Must be overridden by the child class. + """Abstract record generator function. Must be overridden by the child class. - Each row emitted should be a dictionary of property names to their values. + Each record emitted should be a dictionary of property names to their values. Returns either a record dict or a tuple: (record_dict, child_context) A method which should retrieve data from the source and return records @@ -1161,6 +1272,57 @@ def get_records(self, context: dict | None) -> Iterable[dict | tuple[dict, dict] """ pass + def get_batch_config(self, config: Mapping) -> BatchConfig | None: + """Return the batch config for this stream. + + Args: + config: Tap configuration dictionary. + + Returns: + Batch config for this stream. + """ + raw = config.get("batch_config") + return BatchConfig.from_dict(raw) if raw else None + + def get_batches( + self, + batch_config: BatchConfig, + context: dict | None = None, + ) -> Iterable[tuple[BaseBatchFileEncoding, list[str]]]: + """Batch generator function. + + Developers are encouraged to override this method to customize batching + behavior for databases, bulk APIs, etc. + + Args: + batch_config: Batch config for this stream. + context: Stream partition or context dictionary. + + Yields: + A tuple of (encoding, manifest) for each batch. + """ + sync_id = f"{self.tap_name}--{self.name}-{uuid4()}" + prefix = batch_config.storage.prefix or "" + + for i, chunk in enumerate( + lazy_chunked_generator( + self._sync_records(context, write_messages=False), + self.batch_size, + ), + start=1, + ): + filename = f"{prefix}{sync_id}-{i}.json.gz" + with batch_config.storage.fs() as fs: + with fs.open(filename, "wb") as f: + # TODO: Determine compression from config. + with gzip.GzipFile(fileobj=f, mode="wb") as gz: + gz.writelines( + (json.dumps(record) + "\n").encode() for record in chunk + ) + file_url = fs.geturl(filename) + + yield batch_config.encoding, [file_url] + def post_process(self, row: dict, context: dict | None = None) -> dict | None: """As needed, append or transform raw data to match expected structure. diff --git a/singer_sdk/streams/rest.py b/singer_sdk/streams/rest.py index f69a0de4c..a1b52cb10 100644 --- a/singer_sdk/streams/rest.py +++ b/singer_sdk/streams/rest.py @@ -483,9 +483,9 @@ def timeout(self) -> int: # Records iterator def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: - """Return a generator of row-type dictionary objects. + """Return a generator of record-type dictionary objects. - Each row emitted should be a dictionary of property names to their values. + Each record emitted should be a dictionary of property names to their values. Args: context: Stream partition or context dictionary. @@ -501,7 +501,7 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: yield transformed_record def parse_response(self, response: requests.Response) -> Iterable[dict]: - """Parse the response and return an iterator of result rows. + """Parse the response and return an iterator of result records. Args: response: A raw `requests.Response`_ object. diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index 0a05499ef..deb41ea33 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -1020,7 +1020,7 @@ def fully_qualified_name(self) -> str: # Get records from stream def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: - """Return a generator of row-type dictionary objects. + """Return a generator of record-type dictionary objects. If the stream has a replication_key value defined, records will be sorted by the incremental key. If the stream also has an available starting bookmark, the @@ -1056,8 +1056,8 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: ) ) - for row in self.connector.connection.execute(query): - yield dict(row) + for record in self.connector.connection.execute(query): + yield dict(record) __all__ = ["SQLStream", "SQLConnector"] diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 1b58e88d6..0b21b04c5 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -14,6 +14,7 @@ from singer_sdk.cli import common_options from singer_sdk.exceptions import RecordsWithoutSchemaException +from singer_sdk.helpers._batch import BaseBatchFileEncoding from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._compat import final from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities @@ -267,6 +268,7 @@ def _process_lines(self, file_input: IO[str]) -> Counter[str]: self.logger.info( f"Target '{self.name}' completed reading {line_count} lines of input " f"({counter[SingerMessageType.RECORD]} records, " + f"({counter[SingerMessageType.BATCH]} batch manifests, " f"{counter[SingerMessageType.STATE]} state messages)." ) @@ -401,6 +403,20 @@ def _process_activate_version_message(self, message_dict: dict) -> None: sink = self.get_sink(stream_name) sink.activate_version(message_dict["version"]) + def _process_batch_message(self, message_dict: dict) -> None: + """Handle the optional BATCH message extension. + + Args: + message_dict: TODO + """ + sink = self.get_sink(message_dict["stream"]) + + encoding = BaseBatchFileEncoding.from_dict(message_dict["encoding"]) + sink.process_batch_files( + encoding, + message_dict["manifest"], + ) + # Sink drain methods @final diff --git a/tests/core/resources/batch.1.jsonl.gz b/tests/core/resources/batch.1.jsonl.gz new file mode 100644 index 000000000..393bd2953 Binary files /dev/null and b/tests/core/resources/batch.1.jsonl.gz differ diff --git a/tests/core/resources/batch.2.jsonl.gz b/tests/core/resources/batch.2.jsonl.gz new file mode 100644 index 000000000..c06fcd59d Binary files /dev/null and b/tests/core/resources/batch.2.jsonl.gz differ diff --git a/tests/core/test_batch.py b/tests/core/test_batch.py new file mode 100644 index 000000000..740f6d281 --- /dev/null +++ b/tests/core/test_batch.py @@ -0,0 +1,40 @@ +from dataclasses import asdict +from urllib.parse import urlparse + +import pytest + +from singer_sdk.helpers._batch import ( + BaseBatchFileEncoding, + JSONLinesEncoding, + StorageTarget, +) + + +@pytest.mark.parametrize( + "encoding,expected", + [ + (JSONLinesEncoding("gzip"), {"compression": "gzip", "format": "jsonl"}), + (JSONLinesEncoding(), {"compression": None, "format": "jsonl"}), + ], + ids=["jsonl-compression-gzip", "jsonl-compression-none"], +) +def test_encoding_as_dict(encoding: BaseBatchFileEncoding, expected: dict) -> None: + """Test encoding as dict.""" + assert asdict(encoding) == expected + + +def test_storage_get_url(): + storage = StorageTarget("file://root_dir") + + with storage.fs(create=True) as fs: + url = fs.geturl("prefix--file.jsonl.gz") + assert url.startswith("file://") + assert url.replace("\\", "/").endswith("root_dir/prefix--file.jsonl.gz") + + +def test_storage_from_url(): + url = urlparse("s3://bucket/path/to/file?region=us-east-1") + target = StorageTarget.from_url(url) + assert target.root == "s3://bucket" + assert target.prefix is None + assert target.params == {"region": ["us-east-1"]} diff --git a/tests/core/test_countries_sync.py b/tests/core/test_countries_sync.py index 39abc58fe..9161e2c06 100644 --- a/tests/core/test_countries_sync.py +++ b/tests/core/test_countries_sync.py @@ -1,7 +1,12 @@ """Test sample sync.""" import copy +import io +import json import logging +from contextlib import redirect_stdout +from re import I +from typing import Counter from samples.sample_tap_countries.countries_tap import SampleTapCountries from singer_sdk.helpers._catalog import ( @@ -82,3 +87,47 @@ def test_with_catalog_entry(): logger=logging.getLogger(), ) assert new_schema == stream.schema + + +def test_batch_mode(monkeypatch, outdir): + """Test batch mode.""" + tap = SampleTapCountries( + config={ + "batch_config": { + "encoding": { + "format": "jsonl", + "compression": "gzip", + }, + "storage": { + "root": outdir, + "prefix": "pytest-countries-", + }, + } + } + ) + + buf = io.StringIO() + with redirect_stdout(buf): + tap.sync_all() + + buf.seek(0) + lines = buf.read().splitlines() + messages = [json.loads(line) for line in lines] + + def tally_messages(messages: list) -> Counter: + """Tally messages.""" + return Counter( + (message["type"], message["stream"]) + if message["type"] != "STATE" + else (message["type"],) + for message in messages + ) + + counter = tally_messages(messages) + assert counter["SCHEMA", "continents"] == 1 + assert counter["BATCH", "continents"] == 1 + + assert counter["SCHEMA", "countries"] == 1 + assert counter["BATCH", "countries"] == 1 + + assert counter[("STATE",)] == 2 diff --git a/tests/core/test_record_typing.py b/tests/core/test_record_typing.py index d883ee914..d9cc7beb9 100644 --- a/tests/core/test_record_typing.py +++ b/tests/core/test_record_typing.py @@ -15,7 +15,7 @@ @pytest.mark.parametrize( - "row,schema,expected_row", + "record,schema,expected_row", [ ( {"updatedAt": pendulum.parse("2021-08-25T20:05:28+00:00")}, @@ -34,12 +34,12 @@ ), ], ) -def test_conform_record_data_types(row: Dict[str, Any], schema: dict, expected_row): +def test_conform_record_data_types(record: Dict[str, Any], schema: dict, expected_row): stream_name = "test-stream" # TODO: mock this out logger = logging.getLogger() - actual = conform_record_data_types(stream_name, row, schema, logger) - print(row["updatedAt"].isoformat()) + actual = conform_record_data_types(stream_name, record, schema, logger) + print(record["updatedAt"].isoformat()) assert actual == expected_row diff --git a/tests/core/test_singer_messages.py b/tests/core/test_singer_messages.py new file mode 100644 index 000000000..3858731a9 --- /dev/null +++ b/tests/core/test_singer_messages.py @@ -0,0 +1,40 @@ +from dataclasses import asdict + +import pytest + +from singer_sdk.helpers._batch import JSONLinesEncoding, SDKBatchMessage +from singer_sdk.helpers._singer import SingerMessageType + + +@pytest.mark.parametrize( + "message,expected", + [ + ( + SDKBatchMessage( + stream="test_stream", + encoding=JSONLinesEncoding("gzip"), + manifest=[ + "path/to/file1.jsonl.gz", + "path/to/file2.jsonl.gz", + ], + ), + { + "type": SingerMessageType.BATCH, + "stream": "test_stream", + "encoding": {"compression": "gzip", "format": "jsonl"}, + "manifest": [ + "path/to/file1.jsonl.gz", + "path/to/file2.jsonl.gz", + ], + }, + ) + ], + ids=["batch-message-jsonl"], +) +def test_batch_message_as_dict(message, expected): + """Test batch message as dict.""" + + dumped = message.asdict() + assert dumped == expected + + assert message.from_dict(dumped) == message diff --git a/tests/core/test_sqlite.py b/tests/core/test_sqlite.py index b85ff7d67..b6a24fd77 100644 --- a/tests/core/test_sqlite.py +++ b/tests/core/test_sqlite.py @@ -1,6 +1,7 @@ """Typing tests.""" import json +import sqlite3 from copy import deepcopy from io import StringIO from pathlib import Path @@ -109,6 +110,14 @@ def sqlite_sample_target_soft_delete(sqlite_target_test_config): return SQLiteTarget(conf) +@pytest.fixture +def sqlite_sample_target_batch(sqlite_target_test_config): + """Get a sample target object with hard_delete disabled.""" + conf = sqlite_target_test_config + + return SQLiteTarget(conf) + + def _discover_and_select_all(tap: SQLTap) -> None: """Discover catalog and auto-select all streams.""" for catalog_entry in tap.catalog_dict["streams"]: @@ -388,6 +397,52 @@ def test_sqlite_column_morph(sqlite_sample_target: SQLTarget): ) +def test_sqlite_process_batch_message( + sqlite_target_test_config: dict, + sqlite_sample_target_batch: SQLiteTarget, +): + """Test handling the batch message for the SQLite target. + + Test performs the following actions: + + - Sends a batch message for a table that doesn't exist (which should + have no effect) + """ + schema_message = { + "type": "SCHEMA", + "stream": "users", + "key_properties": ["id"], + "schema": { + "required": ["id"], + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": ["null", "string"]}, + }, + }, + } + batch_message = { + "type": "BATCH", + "stream": "users", + "encoding": {"format": "jsonl", "compression": "gzip"}, + "manifest": [ + "file://tests/core/resources/batch.1.jsonl.gz", + "file://tests/core/resources/batch.2.jsonl.gz", + ], + } + tap_output = "\n".join([json.dumps(schema_message), json.dumps(batch_message)]) + + target_sync_test( + sqlite_sample_target_batch, + input=StringIO(tap_output), + finalize=True, + ) + db = sqlite3.connect(sqlite_target_test_config["path_to_db"]) + cursor = db.cursor() + cursor.execute("SELECT COUNT(*) as count FROM users") + assert cursor.fetchone()[0] == 4 + + def test_sqlite_column_no_morph(sqlite_sample_target: SQLTarget): """End-to-end-to-end test for SQLite tap and target. diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index 5bbd4b466..8945ca996 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -339,9 +339,9 @@ def test_jsonpath_rest_stream( RestTestStream.records_jsonpath = path stream = RestTestStream(tap) - rows = stream.parse_response(fake_response) + records = stream.parse_response(fake_response) - assert list(rows) == result + assert list(records) == result def test_jsonpath_graphql_stream_default(tap: SimpleTestTap): @@ -359,9 +359,9 @@ def test_jsonpath_graphql_stream_default(tap: SimpleTestTap): fake_response._content = str.encode(content) stream = GraphqlTestStream(tap) - rows = stream.parse_response(fake_response) + records = stream.parse_response(fake_response) - assert list(rows) == [{"id": 1, "value": "abc"}, {"id": 2, "value": "def"}] + assert list(records) == [{"id": 1, "value": "abc"}, {"id": 2, "value": "def"}] def test_jsonpath_graphql_stream_override(tap: SimpleTestTap): @@ -382,9 +382,9 @@ def records_jsonpath(cls): stream = GraphQLJSONPathOverride(tap) - rows = stream.parse_response(fake_response) + records = stream.parse_response(fake_response) - assert list(rows) == [{"id": 1, "value": "abc"}, {"id": 2, "value": "def"}] + assert list(records) == [{"id": 1, "value": "abc"}, {"id": 2, "value": "def"}] @pytest.mark.parametrize(