Skip to content

Commit

Permalink
feat: S3 batch storage (#1087)
Browse files Browse the repository at this point in the history
* Added PyFilesystem/s3fs as an optional dependency

* Test s3 protocol

* Add dummy credentials to storage test

* Add fs-s3fs to support nox tests

* Shortening s3fs extras name to s3

* Include s3 extra instead of fs-s3fs directly

* importlib-metadata 5.0.0 deprecated endpoint.

No longer works with python 3.7. Going back down to 4.13.0

* Add s3 extra to cookiecutter toml, update docs

* Mypy python 3.8 fix

From ambiguous Any return type to explicity Traversable return type.

* Sync versions

* Add S3 extra to target cookiecutter

* Update SDK minor version in cookiecutter

* re-lock dependencies

* Make explicit extra for s3 dependencies
  • Loading branch information
jamielxcarter authored Nov 17, 2022
1 parent fe67247 commit ca27fd5
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ license = "Apache 2.0"
[tool.poetry.dependencies]
python = "<3.11,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "^0.14.0"
singer-sdk = { version="^0.14.0"}
fs-s3fs = { version = "^1.1.1", optional = true}

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand All @@ -24,6 +25,9 @@ mypy = "^0.910"
types-requests = "^2.26.1"
isort = "^5.10.1"

[tool.poetry.extras]
s3 = ["fs-s3fs"]

[tool.isort]
profile = "black"
multi_line_output = 3 # Vertical Hanging Indent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ license = "Apache 2.0"
[tool.poetry.dependencies]
python = "<3.11,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "^0.14.0"
singer-sdk = { version="^0.14.0"}
fs-s3fs = { version = "^1.1.1", optional = true}

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand All @@ -24,6 +25,9 @@ mypy = "^0.910"
types-requests = "^2.26.1"
isort = "^5.10.1"

[tool.poetry.extras]
s3 = ["fs-s3fs"]

[tool.isort]
profile = "black"
multi_line_output = 3 # Vertical Hanging Indent
Expand Down
21 changes: 19 additions & 2 deletions docs/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ This library's implementation of the `BATCH` message is used to send records in
- when the tap outputs records at a much higher rate than the target can consume them, creating backpressure
- when the source system can directly export data in bulk (e.g. a database dump)

Currently only a local filesystem is supported, but other filesystems like AWS S3, FTP, etc. could be supported in the future.
Currently only a local filesystem or AWS S3 are supported, but other filesystems like FTP, etc. could be supported in the future.

## The `BATCH` Message

Local
```json
{
"type": "BATCH",
Expand All @@ -33,6 +34,22 @@ Currently only a local filesystem is supported, but other filesystems like AWS S
}
```

AWS S3
```json
{
"type": "BATCH",
"stream": "users",
"encoding": {
"format": "jsonl",
"compression": "gzip"
},
"manifest": [
"s3://path/to/batch/file/1",
"s3://path/to/batch/file/2"
]
}
```

### `encoding`

The `encoding` field is used to specify the format and compression of the batch files. Currently only `jsonl` and `gzip` are supported, respectively.
Expand All @@ -43,7 +60,7 @@ The `manifest` field is used to specify the paths to the batch files. The paths

## Batch configuration

When local storage is used, targets do no require special configuration to process `BATCH` messages.
When local storage is used, targets do no require special configuration to process `BATCH` messages. Use of AWS S3 assumes S3/AWS credentials are already discoverable via the underlying S3 libraries (`AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` / `AWS_DEFAULT_REGION`)

Taps may be configured to specify a root storage `root` directory, file path `prefix`, and `encoding` for batch files using a configuration like the below:

Expand Down
3 changes: 2 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def mypy(session: Session) -> None:
@session(python=python_versions)
def tests(session: Session) -> None:
"""Execute pytest tests and compute coverage."""
session.install(".")

session.install(".[s3]")
session.install(*test_dependencies)

# temp fix until pyarrow is supported on python 3.11
Expand Down
259 changes: 172 additions & 87 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ sphinx-copybutton = {version = ">=0.3.1,<0.6.0", optional = true}
myst-parser = {version = ">=0.17.2,<0.19.0", optional = true}
sphinx-autobuild = {version = "^2021.3.14", optional = true}

# File storage dependencies installed as optional 'filesystem' extras
fs-s3fs = { version = "^1.1.1", optional = true}

[tool.poetry.extras]
docs = [
"sphinx",
Expand All @@ -76,6 +79,7 @@ docs = [
"myst-parser",
"sphinx-autobuild",
]
s3 = ["fs-s3fs"]
# Mark these as extras so that installs pass on python 3.11
samples = [
"pyarrow",
Expand Down
3 changes: 2 additions & 1 deletion singer_sdk/helpers/_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import sys
from types import ModuleType
from typing import cast

if sys.version_info >= (3, 9):
import importlib.resources as importlib_resources
Expand All @@ -21,4 +22,4 @@ def get_package_files(package: str | ModuleType) -> Traversable:
Returns:
The file as a Traversable object.
"""
return importlib_resources.files(package)
return cast(Traversable, importlib_resources.files(package))
15 changes: 15 additions & 0 deletions tests/core/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ def test_storage_get_url():
assert url.replace("\\", "/").endswith("root_dir/prefix--file.jsonl.gz")


def test_storage_get_s3_url():
storage = StorageTarget("s3://testing123:testing123@test_bucket")

with storage.fs(create=True) as fs:
url = fs.geturl("prefix--file.jsonl.gz")
assert url.startswith(
"https://s3.amazonaws.com/test_bucket/prefix--file.jsonl.gz"
)


@pytest.mark.parametrize(
"file_url,root",
[
Expand All @@ -40,6 +50,11 @@ def test_storage_get_url():
"file:///Users/sdk/path/to",
id="local",
),
pytest.param(
"s3://test_bucket/prefix--file.jsonl.gz",
"s3://test_bucket",
id="s3",
),
],
)
def test_storage_from_url(file_url: str, root: str):
Expand Down

0 comments on commit ca27fd5

Please sign in to comment.