Skip to content

Commit

Permalink
Merge branch 'main' into kgpayne/issue967
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Payne authored Sep 15, 2022
2 parents 7f7b6ee + 5bab2d8 commit 9e98a97
Show file tree
Hide file tree
Showing 32 changed files with 1,005 additions and 120 deletions.
35 changes: 30 additions & 5 deletions cookiecutter/tap-template/{{cookiecutter.tap_id}}/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,39 @@

Built with the [Meltano Tap SDK](https://sdk.meltano.com) for Singer Taps.

<!--
Developer TODO: Update the below as needed to correctly describe the install procedure. For instance, if you do not have a PyPi repo, or if you want users to directly install from your git repo, you can modify this step as appropriate.
## Installation
- [ ] `Developer TODO:` Update the below as needed to correctly describe the install procedure. For instance, if you do not have a PyPi repo, or if you want users to directly install from your git repo, you can modify this step as appropriate.
Install from PyPi:
```bash
pipx install {{ cookiecutter.tap_id }}
```
Install from GitHub:
```bash
pipx install git+https://github.com/ORG_NAME/{{ cookiecutter.tap_id }}.git@main
```
-->

## Configuration

### Accepted Config Options

- [ ] `Developer TODO:` Provide a list of config options accepted by the tap.
<!--
Developer TODO: Provide a list of config options accepted by the tap.
This section can be created by copy-pasting the CLI output from:
```
{{ cookiecutter.tap_id }} --about --format=markdown
```
-->

A full list of supported settings and capabilities for this
tap is available by running:
Expand All @@ -33,7 +53,9 @@ environment variable is set either in the terminal context or in the `.env` file

### Source Authentication and Authorization

- [ ] `Developer TODO:` If your tap requires special access on the source system, or any special authentication requirements, provide those here.
<!--
Developer TODO: If your tap requires special access on the source system, or any special authentication requirements, provide those here.
-->

## Usage

Expand All @@ -49,7 +71,7 @@ You can easily run `{{ cookiecutter.tap_id }}` by itself or in a pipeline using

## Developer Resources

- [ ] `Developer TODO:` As a first step, scan the entire project for the text "`TODO:`" and complete any recommended steps, deleting the "TODO" references once completed.
Follow these instructions to contribute to this project.

### Initialize your Development Environment

Expand Down Expand Up @@ -78,8 +100,11 @@ poetry run {{cookiecutter.tap_id}} --help
_**Note:** This tap will work in any Singer environment and does not require Meltano.
Examples here are for convenience and to streamline end-to-end orchestration scenarios._

Your project comes with a custom `meltano.yml` project file already created. Open the `meltano.yml` and follow any _"TODO"_ items listed in
<!--
Developer TODO:
Your project comes with a custom `meltano.yml` project file already created. Open the `meltano.yml` and follow any "TODO" items listed in
the file.
-->

Next, install Meltano (if you haven't already) and any needed plugins:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class {{ cookiecutter.source_name }}Stream({{ cookiecutter.stream_type }}Stream)
return headers

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
"""Parse the response and return an iterator of result records."""
# TODO: Parse response body and return a set of records.
resp_json = response.json()
for row in resp_json.get("<TODO>"):
yield row
for record in resp_json.get("<TODO>"):
yield record

def post_process(self, row: dict, context: Optional[dict] = None) -> dict:
"""As needed, append or transform raw data to match expected structure."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ class {{ cookiecutter.source_name }}Stream(Stream):
"""Stream class for {{ cookiecutter.source_name }} streams."""

def get_records(self, context: Optional[dict]) -> Iterable[dict]:
"""Return a generator of row-type dictionary objects.
"""Return a generator of record-type dictionary objects.

The optional `context` argument is used to identify a specific slice of the
stream if partitioning is required for the stream. Most implementations do not
require partitioning and should ignore the `context` argument.
"""
# TODO: Write logic to extract data from the upstream source.
# rows = mysource.getall()
# for row in rows:
# yield row.to_dict()
# records = mysource.getall()
# for record in records:
# yield record.to_dict()
raise NotImplementedError("The method is not yet implemented (TODO)")
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class {{ cookiecutter.source_name }}Stream({{ cookiecutter.stream_type }}Stream)
return None

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
"""Parse the response and return an iterator of result records."""
# TODO: Parse response body and return a set of records.
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class {{ cookiecutter.source_name }}Stream(SQLStream):
connector_class = {{ cookiecutter.source_name }}Connector

def get_records(self, partition: Optional[dict]) -> Iterable[Dict[str, Any]]:
"""Return a generator of row-type dictionary objects.
"""Return a generator of record-type dictionary objects.

Developers may optionally add custom logic before calling the default
implementation inherited from the base class.
Expand Down
35 changes: 30 additions & 5 deletions cookiecutter/target-template/{{cookiecutter.target_id}}/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,39 @@

Build with the [Meltano Target SDK](https://sdk.meltano.com).

<!--
Developer TODO: Update the below as needed to correctly describe the install procedure. For instance, if you do not have a PyPi repo, or if you want users to directly install from your git repo, you can modify this step as appropriate.
## Installation
- [ ] `Developer TODO:` Update the below as needed to correctly describe the install procedure. For instance, if you do not have a PyPi repo, or if you want users to directly install from your git repo, you can modify this step as appropriate.
Install from PyPi:
```bash
pipx install {{ cookiecutter.target_id }}
```
Install from GitHub:
```bash
pipx install git+https://github.com/ORG_NAME/{{ cookiecutter.target_id }}.git@main
```
-->

## Configuration

### Accepted Config Options

- [ ] `Developer TODO:` Provide a list of config options accepted by the target.
<!--
Developer TODO: Provide a list of config options accepted by the target.
This section can be created by copy-pasting the CLI output from:
```
{{ cookiecutter.target_id }} --about --format=markdown
```
-->

A full list of supported settings and capabilities for this
target is available by running:
Expand All @@ -33,7 +53,9 @@ environment variable is set either in the terminal context or in the `.env` file

### Source Authentication and Authorization

- [ ] `Developer TODO:` If your target requires special access on the source system, or any special authentication requirements, provide those here.
<!--
Developer TODO: If your target requires special access on the destination system, or any special authentication requirements, provide those here.
-->

## Usage

Expand All @@ -50,7 +72,7 @@ tap-carbon-intensity | {{ cookiecutter.target_id }} --config /path/to/{{ cookiec

## Developer Resources

- [ ] `Developer TODO:` As a first step, scan the entire project for the text "`TODO:`" and complete any recommended steps, deleting the "TODO" references once completed.
Follow these instructions to contribute to this project.

### Initialize your Development Environment

Expand Down Expand Up @@ -79,8 +101,11 @@ poetry run {{cookiecutter.target_id}} --help
_**Note:** This target will work in any Singer environment and does not require Meltano.
Examples here are for convenience and to streamline end-to-end orchestration scenarios._

Your project comes with a custom `meltano.yml` project file already created. Open the `meltano.yml` and follow any _"TODO"_ items listed in
<!--
Developer TODO:
Your project comes with a custom `meltano.yml` project file already created. Open the `meltano.yml` and follow any "TODO" items listed in
the file.
-->

Next, install Meltano (if you haven't already) and any needed plugins:

Expand Down
95 changes: 95 additions & 0 deletions docs/batch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Batch Messages

```{warning}
The `BATCH` message functionality is currently in preview and is subject to change.
You can [open an issue](https://github.com/meltano/sdk/issues) or [join the discussion](https://github.com/meltano/sdk/discussions/963) on GitHub to provide feedback during the preview period.
```

[The Singer message specification](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#output) defines the three basic types of messages: `RECORD`, `STATE`, and `SCHEMA`. The `RECORD` message is used to send data from the tap to the target. The `STATE` message is used to send state data from the tap to the target. The `SCHEMA` message is used to send schema data from the tap to the target, and for example, create tables with the correct column types.

However, the Singer specification can be extended to support additional types of messages. For example, the [`ACTIVATE_VERSION`](https://sdk.meltano.com/en/latest/capabilities.html#singer_sdk.helpers.capabilities.PluginCapabilities.ACTIVATE_VERSION) message is used to manage hard deletes in the target.

This library's implementation of the `BATCH` message is used to send records in bulk from the tap to the target, using an intermediate filesystem to store _batch_ files. This is useful, for example

- when the tap outputs records at a much higher rate than the target can consume them, creating backpressure
- when the source system can directly export data in bulk (e.g. a database dump)

Currently only a local filesystem is supported, but other filesystems like AWS S3, FTP, etc. could be supported in the future.

## The `BATCH` Message

```json
{
"type": "BATCH",
"stream": "users",
"encoding": {
"format": "jsonl",
"compression": "gzip"
},
"manifest": [
"file://path/to/batch/file/1",
"file://path/to/batch/file/2"
]
}
```

### `encoding`

The `encoding` field is used to specify the format and compression of the batch files. Currently only `jsonl` and `gzip` are supported, respectively.

### `manifest`

The `manifest` field is used to specify the paths to the batch files. The paths are relative to the `root` directory specified in the [`batch_config`](#batch-configuration) storage configuration.

## Batch configuration

When local storage is used, targets do no require special configuration to process `BATCH` messages.

Taps may be configured to specify a root storage `root` directory, file path `prefix`, and `encoding` for batch files using a configuration like the below:


In `config.json`:

```js
{
// ...
"batch_config": {
"encoding": {
"format": "jsonl",
"compression": "gzip",
},
"storage": {
"root": "file://tests/core/resources",
"prefix": "test-batch-",
}
}
}
```

## Custom batch file creation and processing

### Tap side

Taps can optionally customize the batch file creation by implementing the [`get_batches`](singer_sdk.Stream.get_batches). This method should return a _tuple_ of an encoding and a list of batch files:

```python
class MyStream(Stream):
def get_batches(self, records):
return (
ParquetEncoding(compression="snappy"),
[
"s3://my-bucket/my-batch-file-1.parquet",
"s3://my-bucket/my-batch-file-2.parquet",
]
)
```

### Target side

Targets can optionally customize the batch file processing by implementing the [`process_batch_files`](singer_sdk.Sink.process_batch_files).

```python
class MySink(Sink):
def process_batch_files(self, encoding, storage, files):
# process the batch files
```
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Advanced Topics
parent_streams
partitioning
stream_maps
batch
porting
sinks
CONTRIBUTING
Expand Down
40 changes: 36 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9e98a97

Please sign in to comment.