Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: S3 batch storage #1087

Merged
merged 28 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1894f39
Added PyFilesystem/s3fs as an optional dependency
jamielxcarter Oct 14, 2022
3713157
Test s3 protocol
jamielxcarter Oct 19, 2022
af2844e
Add dummy credentials to storage test
jamielxcarter Oct 19, 2022
d95cea3
Add fs-s3fs to support nox tests
jamielxcarter Oct 19, 2022
a771504
Merge branch 'main' into s3-batch-storage
jamielxcarter Oct 19, 2022
83cddcd
Shortening s3fs extras name to s3
jamielxcarter Oct 19, 2022
4cc2ed0
Include s3 extra instead of fs-s3fs directly
jamielxcarter Oct 19, 2022
55f465e
importlib-metadata 5.0.0 deprecated endpoint.
jamielxcarter Oct 19, 2022
75b187e
Merge branch 'main' into s3-batch-storage
jamielxcarter Oct 19, 2022
deec62b
Add s3 extra to cookiecutter toml, update docs
jamielxcarter Oct 28, 2022
5620359
Merge branch 'main' into s3-batch-storage
jamielxcarter Oct 28, 2022
419ba79
Mypy python 3.8 fix
jamielxcarter Nov 2, 2022
82f5fe8
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 2, 2022
ab4df4b
Sync versions
jamielxcarter Nov 7, 2022
607b815
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 7, 2022
4fd0e76
Add S3 extra to target cookiecutter
jamielxcarter Nov 8, 2022
9d610dd
Update SDK minor version in cookiecutter
jamielxcarter Nov 8, 2022
4e416b5
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 8, 2022
ae4c8e9
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 9, 2022
e2dcbb7
re-lock dependencies
jamielxcarter Nov 9, 2022
62bd670
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 9, 2022
54c5b46
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 10, 2022
1883033
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 14, 2022
e4fede2
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 14, 2022
5773a5b
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 16, 2022
febb1ea
Make explicit extra for s3 dependencies
jamielxcarter Nov 16, 2022
4c33cf8
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 16, 2022
ef9cfcd
Merge branch 'main' into s3-batch-storage
jamielxcarter Nov 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ license = "Apache 2.0"
[tool.poetry.dependencies]
python = "<3.11,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "^0.14.0"
singer-sdk = { version="^0.14.0"}
fs-s3fs = { version = "^1.1.1", optional = true}

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand All @@ -24,6 +25,9 @@ mypy = "^0.910"
types-requests = "^2.26.1"
isort = "^5.10.1"

[tool.poetry.extras]
s3 = ["fs-s3fs"]

[tool.isort]
profile = "black"
multi_line_output = 3 # Vertical Hanging Indent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ license = "Apache 2.0"
[tool.poetry.dependencies]
python = "<3.11,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "^0.14.0"
singer-sdk = { version="^0.14.0"}
fs-s3fs = { version = "^1.1.1", optional = true}

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand All @@ -24,6 +25,9 @@ mypy = "^0.910"
types-requests = "^2.26.1"
isort = "^5.10.1"

[tool.poetry.extras]
s3 = ["fs-s3fs"]

[tool.isort]
profile = "black"
multi_line_output = 3 # Vertical Hanging Indent
Expand Down
21 changes: 19 additions & 2 deletions docs/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ This library's implementation of the `BATCH` message is used to send records in
- when the tap outputs records at a much higher rate than the target can consume them, creating backpressure
- when the source system can directly export data in bulk (e.g. a database dump)

Currently only a local filesystem is supported, but other filesystems like AWS S3, FTP, etc. could be supported in the future.
Currently only a local filesystem or AWS S3 are supported, but other filesystems like FTP, etc. could be supported in the future.

## The `BATCH` Message

Local
```json
{
"type": "BATCH",
Expand All @@ -33,6 +34,22 @@ Currently only a local filesystem is supported, but other filesystems like AWS S
}
```

AWS S3
```json
{
"type": "BATCH",
"stream": "users",
"encoding": {
"format": "jsonl",
"compression": "gzip"
},
"manifest": [
"s3://path/to/batch/file/1",
"s3://path/to/batch/file/2"
]
}
```

### `encoding`

The `encoding` field is used to specify the format and compression of the batch files. Currently only `jsonl` and `gzip` are supported, respectively.
Expand All @@ -43,7 +60,7 @@ The `manifest` field is used to specify the paths to the batch files. The paths

## Batch configuration

When local storage is used, targets do no require special configuration to process `BATCH` messages.
When local storage is used, targets do no require special configuration to process `BATCH` messages. Use of AWS S3 assumes S3/AWS credentials are already discoverable via the underlying S3 libraries (`AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` / `AWS_DEFAULT_REGION`)

Taps may be configured to specify a root storage `root` directory, file path `prefix`, and `encoding` for batch files using a configuration like the below:

Expand Down
3 changes: 2 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def mypy(session: Session) -> None:
@session(python=python_versions)
def tests(session: Session) -> None:
"""Execute pytest tests and compute coverage."""
session.install(".")

session.install(".[s3]")
session.install(*test_dependencies)

# temp fix until pyarrow is supported on python 3.11
Expand Down
287 changes: 194 additions & 93 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ sphinx-copybutton = {version = ">=0.3.1,<0.6.0", optional = true}
myst-parser = {version = ">=0.17.2,<0.19.0", optional = true}
sphinx-autobuild = {version = "^2021.3.14", optional = true}

# File storage dependencies installed as optional 'filesystem' extras
fs-s3fs = { version = "^1.1.1", optional = true}

[tool.poetry.extras]
docs = [
"sphinx",
Expand All @@ -76,6 +79,7 @@ docs = [
"myst-parser",
"sphinx-autobuild",
]
s3 = ["fs-s3fs"]
# Mark these as extras so that installs pass on python 3.11
samples = [
"pyarrow",
Expand Down
3 changes: 2 additions & 1 deletion singer_sdk/helpers/_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import sys
from types import ModuleType
from typing import cast

if sys.version_info >= (3, 9):
import importlib.resources as importlib_resources
Expand All @@ -21,4 +22,4 @@ def get_package_files(package: str | ModuleType) -> Traversable:
Returns:
The file as a Traversable object.
"""
return importlib_resources.files(package)
return cast(Traversable, importlib_resources.files(package))
15 changes: 15 additions & 0 deletions tests/core/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ def test_storage_get_url():
assert url.replace("\\", "/").endswith("root_dir/prefix--file.jsonl.gz")


def test_storage_get_s3_url():
storage = StorageTarget("s3://testing123:testing123@test_bucket")

with storage.fs(create=True) as fs:
url = fs.geturl("prefix--file.jsonl.gz")
assert url.startswith(
"https://s3.amazonaws.com/test_bucket/prefix--file.jsonl.gz"
)


@pytest.mark.parametrize(
"file_url,root",
[
Expand All @@ -40,6 +50,11 @@ def test_storage_get_url():
"file:///Users/sdk/path/to",
id="local",
),
pytest.param(
"s3://test_bucket/prefix--file.jsonl.gz",
"s3://test_bucket",
id="s3",
),
],
)
def test_storage_from_url(file_url: str, root: str):
Expand Down