Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: Add initial Destination abstraction and tests #4719

Merged
merged 12 commits into from
Jul 13, 2021
1 change: 0 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def __init__(self, spec_string):


class Connector(ABC):

# can be overridden to change an input config
def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
"""
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/python/airbyte_cdk/destinations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .destination import Destination
99 changes: 96 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,103 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
import argparse
import io
import sys
from abc import abstractmethod, ABC
from typing import List, Mapping, Iterable, Any


from airbyte_cdk import AirbyteLogger
from airbyte_cdk.connector import Connector
from airbyte_cdk.models import ConfiguredAirbyteCatalog, AirbyteMessage, Type


class Destination(Connector, ABC):
logger = AirbyteLogger()

@abstractmethod
def write(
self,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""Implement to define how the connector writes data to the destination"""

def _run_spec(self) -> AirbyteMessage:
return AirbyteMessage(type=Type.SPEC, spec=self.spec(self.logger))

def _run_check(self, config_path: str) -> AirbyteMessage:
config = self.read_config(config_path=config_path)
check_result = self.check(self.logger, config)
return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result)

def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
""" Reads from stdin, converting to Airbyte messages"""
for line in input_stream:
try:
yield AirbyteMessage.parse_raw(line)
except Exception:
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
self.logger.info(f"ignoring input which can't be serialized as Airbyte Message: {line}")
sherifnada marked this conversation as resolved.
Show resolved Hide resolved

def _run_write(self, config_path: str, configured_catalog_path: str, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
config = self.read_config(config_path=config_path)
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
input_messages = self._parse_input_stream(input_stream)
self.logger.info("Begin writing to the destination...")
yield from self.write(config=config, configured_catalog=catalog, input_messages=input_messages)
self.logger.info("Writing complete.")

def parse_args(self, args: List[str]) -> argparse.Namespace:
"""
:param args: commandline arguments
:return:
"""

parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")

# spec
subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser])

# check
check_parser = subparsers.add_parser("check", help="checks the config can be used to connect", parents=[parent_parser])
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")

# write
write_parser = subparsers.add_parser("write", help="Writes data to the destination", parents=[parent_parser])
write_required = write_parser.add_argument_group("required named arguments")
write_required.add_argument("--config", type=str, required=True, help="path to the JSON configuration file")
write_required.add_argument("--catalog", type=str, required=True, help="path to the configured catalog JSON file")

parsed_args = main_parser.parse_args(args)
cmd = parsed_args.command
if not cmd:
raise Exception("No command entered. ")
elif cmd not in ["spec", "check", "write"]:
# This is technically dead code since parse_args() would fail if this was the case
# But it's non-obvious enough to warrant placing it here anyways
raise Exception(f"Unknown command entered: {cmd}")

return parsed_args

def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
cmd = parsed_args.command
if cmd == 'spec':
yield self._run_spec()
elif cmd == 'check':
yield self._run_check(config_path=parsed_args.config)
elif cmd == 'write':
# Wrap in UTF-8 to override any other input encodings
wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
yield from self._run_write(config_path=parsed_args.config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)
else:
raise Exception(f"Unrecognized command: {cmd}")

class Destination(Connector):
pass # TODO
def run(self, args: List[str]):
parsed_args = self.parse_args(args)
output_messages = self.run_cmd(parsed_args)
for message in output_messages:
print(message.json(exclude_unset=True))
234 changes: 234 additions & 0 deletions airbyte-cdk/python/unit_tests/destinations/test_destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import argparse
import io
import json
from os import PathLike
from typing import Any, Mapping, List, Union, Dict, Iterable, Iterator
from unittest.mock import ANY

import pytest

from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteRecordMessage, AirbyteStateMessage, AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, Type, \
ConnectorSpecification, Status, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, AirbyteStream, SyncMode, DestinationSyncMode


@pytest.fixture
def destination(mocker) -> Destination:
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
# Wipe the internal list of abstract methods to allow instantiating the abstract class without implementing its abstract methods
mocker.patch('airbyte_cdk.destinations.Destination.__abstractmethods__', set())
# Mypy yells at us because we're init'ing an abstract class
return Destination() # type: ignore


class TestArgParsing:
@pytest.mark.parametrize(
('arg_list', 'expected_output'),
[
(['spec'], {'command': 'spec'}),
(['check', '--config', 'bogus_path/'], {'command': 'check', 'config': 'bogus_path/'}),
(['write', '--config', 'config_path1', '--catalog', 'catalog_path1'],
{'command': 'write', 'config': 'config_path1', 'catalog': 'catalog_path1'}),
]
)
def test_successful_parse(self, arg_list: List[str], expected_output: Mapping[str, Any], destination: Destination):
parsed_args = vars(destination.parse_args(arg_list))
assert parsed_args == expected_output, f"Expected parsing {arg_list} to return parsed args {expected_output} but instead found {parsed_args}"

@pytest.mark.parametrize(
('arg_list'),
[
# Invalid commands
([]),
(['not-a-real-command']),
(['']),
# Incorrect parameters
(['spec', '--config', 'path']),
(['check']),
(['check', '--catalog', 'path']),
(['check', 'path'])
]
)
def test_failed_parse(self, arg_list: List[str], destination: Destination):
# We use BaseException because it encompasses SystemExit (raised by failed parsing) and other exceptions (raised by additional semantic
# checks)
with pytest.raises(BaseException):
destination.parse_args(arg_list)


def _state(state: Dict[str, Any]) -> AirbyteStateMessage:
return AirbyteStateMessage(data=state)


def _record(stream: str, data: Dict[str, Any]) -> AirbyteRecordMessage:
return AirbyteRecordMessage(stream=stream, data=data, emitted_at=0)


def _spec(schema: Dict[str, Any]) -> ConnectorSpecification:
return ConnectorSpecification(connectionSpecification=schema)


def write_file(path: PathLike, content: Union[str, Mapping]):
content = json.dumps(content) if isinstance(content, Mapping) else content
with open(path, 'w') as f:
f.write(content)


def _wrapped(
msg: Union[AirbyteRecordMessage, AirbyteStateMessage, AirbyteCatalog, ConnectorSpecification, AirbyteConnectionStatus]
) -> AirbyteMessage:
if isinstance(msg, AirbyteRecordMessage):
return AirbyteMessage(type=Type.RECORD, record=msg)
elif isinstance(msg, AirbyteStateMessage):
return AirbyteMessage(type=Type.STATE, state=msg)
elif isinstance(msg, AirbyteCatalog):
return AirbyteMessage(type=Type.CATALOG, catalog=msg)
elif isinstance(msg, AirbyteConnectionStatus):
return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=msg)
elif isinstance(msg, ConnectorSpecification):
return AirbyteMessage(type=Type.SPEC, spec=msg)
else:
raise Exception(f"Invalid Airbyte Message: {msg}")


class OrderedIterableMatcher(Iterable):
"""
A class whose purpose is to verify equality of one iterable object against another
in an ordered fashion
"""

def attempt_consume(self, iterator):
try:
return next(iterator)
except StopIteration:
return None

def __iter__(self):
return iter(self.iterable)

def __init__(self, iterable: Iterable):
self.iterable = iterable

def __eq__(self, other):
if not isinstance(other, Iterable):
return False

other_iter = other if isinstance(other, Iterator) else iter(other)
self_iter = self if isinstance(self, Iterator) else iter(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
other_iter = other if isinstance(other, Iterator) else iter(other)
self_iter = self if isinstance(self, Iterator) else iter(self)
other_iter = iter(other)
self_iter = iter(self)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general this is overkill,
just use

return list(self) == list(other)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self_next = other_next = 'not none'
while self_next is not None and other_next is not None:
other_next = self.attempt_consume(other_iter)
self_next = self.attempt_consume(self_iter)
if self_next != other_next:
return False

return True


class TestRun:
def test_run_spec(self, mocker, destination: Destination):
args = {'command': 'spec'}
parsed_args = argparse.Namespace(**args)

expected_spec = ConnectorSpecification(connectionSpecification={'json_schema': {'prop': 'value'}})
mocker.patch.object(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my proposal above, with it you can do:

destination.spec.return_value = expected_spec

destination,
'spec',
return_value=expected_spec,
autospec=True
)

spec_message = next(iter(destination.run_cmd(parsed_args)))
Copy link
Contributor

@keu keu Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
spec_message = next(iter(destination.run_cmd(parsed_args)))
spec_message = next(destination.run_cmd(parsed_args))

is there any reason for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_cmd returns an Iterable so MyPy is not convinced next works without iter


# Mypy doesn't understand magicmock so it thinks spec doesn't have call_count attr
assert destination.spec.call_count == 1 # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert destination.spec.call_count == 1 # type: ignore
destination.spec.assert_called_once() # type: ignore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np, done, but is there a benefit to this?


# verify the output of spec was returned
assert _wrapped(expected_spec) == spec_message

def test_run_check(self, mocker, destination: Destination, tmp_path):
file_path = tmp_path / 'config.json'
dummy_config = {'user': 'sherif'}
write_file(file_path, dummy_config)
args = {'command': 'check', 'config': file_path}

parsed_args = argparse.Namespace(**args)
destination.run_cmd(parsed_args)

expected_check_result = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(
destination,
'check',
return_value=expected_check_result,
autospec=True
)

returned_check_result = next(iter(destination.run_cmd(parsed_args)))
# verify method call with the correct params
# Affirm to Mypy that this is indeed a method on this mock
assert destination.check.call_count == 1 # type: ignore
# Affirm to Mypy that this is indeed a method on this mock
destination.check.assert_called_with(logger=ANY, config=dummy_config) # type: ignore

# verify output was correct
assert _wrapped(expected_check_result) == returned_check_result

def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch):
config_path, dummy_config = tmp_path / 'config.json', {'user': 'sherif'}
write_file(config_path, dummy_config)

dummy_catalog = ConfiguredAirbyteCatalog(streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name='mystream',
json_schema={'type': 'object'}
),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite
)
])
catalog_path = tmp_path / 'catalog.json'
write_file(catalog_path, dummy_catalog.json(exclude_unset=True))

args = {'command': 'write', 'config': config_path, 'catalog': catalog_path}
parsed_args = argparse.Namespace(**args)

destination.run_cmd(parsed_args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why this is here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because I forgot to remove it :P


expected_write_result = [_wrapped(_state({'k1': 'v1'})), _wrapped(_state({'k2': 'v2'}))]
mocker.patch.object(
destination,
'write',
return_value=(x for x in expected_write_result), # convert the list to generator to mimic real usage
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
autospec=True
)
# mock input is a record followed by some state messages
mocked_input: List[AirbyteMessage] = [_wrapped(_record('s1', {'k1': 'v1'})), *expected_write_result]
mocked_stdin_string = "\n".join([record.json(exclude_unset=True) for record in mocked_input])
mocked_stdin_string += "\n add this non-serializable string to verify the destination does not break on malformed input"
mocked_stdin = io.TextIOWrapper(io.BytesIO(bytes(mocked_stdin_string, 'utf-8')))

monkeypatch.setattr('sys.stdin', mocked_stdin)

returned_write_result = list(destination.run_cmd(parsed_args))
# verify method call with the correct params
# Affirm to Mypy that call_count is indeed a method on this mock
assert destination.write.call_count == 1 # type: ignore
# Affirm to Mypy that call_count is indeed a method on this mock
destination.write.assert_called_with( # type: ignore
config=dummy_config,
configured_catalog=dummy_catalog,
# Stdin is internally consumed as a generator so we use a custom matcher
# that iterates over two iterables to check equality
input_messages=OrderedIterableMatcher(mocked_input)
)

# verify output was correct
assert expected_write_result == list(returned_write_result)
sherifnada marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.parametrize(
'args',
[{}, {'command': "fake"}]
)
def test_run_cmd_with_incorrect_args_fails(self, args, destination: Destination):
with pytest.raises(Exception):
list(destination.run_cmd(parsed_args=argparse.Namespace(**args)))
2 changes: 1 addition & 1 deletion docs/integrations/sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ If you would like to test Airbyte using test data on Stripe, `sk_test_` and `rk_
| 0.1.11 | 2021-05-30 | [3744](https://github.com/airbytehq/airbyte/pull/3744) | Fix types in schema |
| 0.1.10 | 2021-05-28 | [3728](https://github.com/airbytehq/airbyte/pull/3728) | Update data types to be number instead of int |
| 0.1.9 | 2021-05-13 | [3367](https://github.com/airbytehq/airbyte/pull/3367) | Add acceptance tests for connected accounts |
| 0.1.8 | 2021-05-11 | [3566](https://github.com/airbytehq/airbyte/pull/3368) | Bump CDK connectors |
| 0.1.8 | 2021-05-11 | [3566](https://github.com/airbytehq/airbyte/pull/3368) | Bump CDK connectors |