Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

configurable date format and handle decimal InvalidOperation #20

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Full list of options in `config.json`:
| encryption_type | String | | (Default: 'none') The type of encryption to use. Current supported options are: 'none' and 'KMS'. |
| encryption_key | String | | A reference to the encryption key to use for data encryption. For KMS encryption, this should be the name of the KMS encryption key ID (e.g. '1234abcd-1234-1234-1234-1234abcd1234'). This field is ignored if 'encryption_type' is none or blank. |
| compression | String | | The type of compression to apply before uploading. Supported options are `none` (default), `gzip`, and `lzma`. For gzipped files, the file extension will automatically be changed to `.json.gz` for all files. For `lzma` compression, the file extension will automatically be changed to `.json.xz` for all files. |
| date_format | String | | (Default: "%Y%m%d") Allows customization of the date format used in the `{date}` token in the naming_convention |
| naming_convention | String | | (Default: None) Custom naming convention of the s3 key. Replaces tokens `date`, `stream`, and `timestamp` with the appropriate values. <br><br>Supports "folders" in s3 keys e.g. `folder/folder2/{stream}/export_date={date}/{timestamp}.json`. <br><br>Honors the `s3_key_prefix`, if set, by prepending the "filename". E.g. naming_convention = `folder1/my_file.json` and s3_key_prefix = `prefix_` results in `folder1/prefix_my_file.json` |
| timezone_offset | Integer | | Use offset `0` hours is you want the `naming_convention` to use `utc` time zone. The `null` values is used by default. |
| temp_dir | String | | (Default: platform-dependent) Directory of temporary JSONL files with RECORD messages. |
Expand Down
3 changes: 2 additions & 1 deletion config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"s3_bucket": "BUCKET",
"s3_key_prefix": "SOME-PREFIX/",
"compression": "gzip",
"naming_convention": "{stream}-{timestamp}.jsonl"
"naming_convention": "{stream}-{timestamp}.jsonl",
"date_format": "%Y-%m-%d"
}
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
jsonschema==3.2.0
boto3==1.18.22
backoff==1.11.1
pytest-cov
moto[s3]
Comment on lines +4 to +5
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are already in the [options.extras_require] section L34-35. I keep them apart as they are not needed to run the package, only to run the tests.
My rational is to keep the target lean if possible.

So I don't know how much of a best practice it is but I used

pip install --upgrade .[test,lint,dist]

To install what's needed on local. It was my way of following the KISS principle here.

adjust-precision-for-schema==0.3.4
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ install_requires =
jsonschema==3.2.0
boto3==1.18.22
backoff==1.11.1
adjust-precision-for-schema==0.3.4
include_package_data = True

[options.package_data]
Expand Down
13 changes: 9 additions & 4 deletions target_s3_jsonl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

from jsonschema import Draft4Validator, FormatChecker
from decimal import Decimal
from adjust_precision_for_schema import adjust_decimal_precision_for_schema
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That a great one. Thanks for adding it.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the Handle multipleOf overflow fix introduced in jsonschema v4.0.0, I wonder if that update doesn't make adjust_precision_for_schema reduntant?


from target_s3_jsonl import s3
from target_s3_jsonl.logger import get_logger

LOGGER = get_logger()

DATE_FORMAT_DEFAULT = "%Y%m%d"

def add_metadata_columns_to_schema(schema_message):
'''Metadata _sdc columns according to the stitch documentation at
Expand Down Expand Up @@ -93,7 +94,7 @@ def float_to_decimal(value):
return value


def get_target_key(message, naming_convention=None, timestamp=None, prefix=None, timezone=None):
def get_target_key(message, naming_convention=None, timestamp=None, prefix=None, timezone=None, date_format=DATE_FORMAT_DEFAULT):
'''Creates and returns an S3 key for the message'''
if not naming_convention:
naming_convention = '{stream}-{timestamp}.json'
Expand All @@ -102,7 +103,7 @@ def get_target_key(message, naming_convention=None, timestamp=None, prefix=None,
key = naming_convention.format(
stream=message['stream'],
timestamp=timestamp if timestamp is not None else datetime.datetime.now(timezone).strftime('%Y%m%dT%H%M%S'),
date=datetime.datetime.now(timezone).strftime('%Y%m%d'),
date=datetime.datetime.now(timezone).strftime(date_format),
time=datetime.datetime.now(timezone).strftime('%H%M%S'))

# NOTE: Replace dynamic tokens
Expand Down Expand Up @@ -143,6 +144,8 @@ def persist_lines(messages, config):
key_properties = {}
validators = {}

date_format = config.get('date_format') or DATE_FORMAT_DEFAULT

naming_convention_default = '{stream}-{timestamp}.json'
naming_convention = config.get('naming_convention') or naming_convention_default
open_func = open
Expand Down Expand Up @@ -224,6 +227,7 @@ def persist_lines(messages, config):
if 'stream' not in o:
raise Exception("Line is missing required key 'stream': {}".format(message))
stream = o['stream']
adjust_decimal_precision_for_schema(o['schema'])

if config.get('add_metadata_columns'):
schemas[stream] = add_metadata_columns_to_schema(o)
Expand All @@ -245,7 +249,8 @@ def persist_lines(messages, config):
naming_convention=naming_convention,
timestamp=now_formatted,
prefix=config.get('s3_key_prefix', ''),
timezone=timezone),
timezone=timezone,
date_format=date_format),
'file_name': temp_dir / naming_convention_default.format(stream=stream, timestamp=now_formatted),
'file_data': []}

Expand Down
2 changes: 1 addition & 1 deletion tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def test_persist_lines(caplog, config, input_data, input_multi_stream_data, inva
dummy_type = '{"type": "DUMMY", "value": {"currently_syncing": "tap_dummy_test-test_table_one"}}'
output_state, output_file_metadata = persist_lines([dummy_type] + input_multi_stream_data, config)

assert caplog.text == 'WARNING root:__init__.py:255 Unknown message type "{}" in message "{}"'.format(
assert caplog.text == 'WARNING root:__init__.py:260 Unknown message type "{}" in message "{}"'.format(
json.loads(dummy_type)['type'], dummy_type.replace('"', "'")) + '\n'

with raises(NotImplementedError):
Expand Down