diff --git a/samples/sample_target_csv/csv_target_sink.py b/samples/sample_target_csv/csv_target_sink.py index f06c3163d..4f02bf0bb 100644 --- a/samples/sample_target_csv/csv_target_sink.py +++ b/samples/sample_target_csv/csv_target_sink.py @@ -41,6 +41,7 @@ def process_batch(self, context: dict) -> None: delimiter="\t", quotechar='"', quoting=csv.QUOTE_NONNUMERIC, + escapechar="\\", ) for record in records_to_drain: if newfile and not records_written: diff --git a/samples/sample_target_parquet/parquet_target_sink.py b/samples/sample_target_parquet/parquet_target_sink.py index 40691c13c..e98dca2b1 100644 --- a/samples/sample_target_parquet/parquet_target_sink.py +++ b/samples/sample_target_parquet/parquet_target_sink.py @@ -36,7 +36,8 @@ def _json_schema_to_arrow_fields(schema: dict[str, t.Any]) -> pa.StructType: for name, property_schema in schema.get("properties", {}).items(): field = pa.field(name, _json_type_to_arrow_field(property_schema)) fields.append(field) - return fields + + return fields if fields else [pa.field("dummy", pa.string())] def _json_type_to_arrow_field( # noqa: PLR0911 @@ -99,22 +100,3 @@ def process_batch(self, context: dict) -> None: table = pa.Table.from_pylist(records_to_drain, schema=schema) writer.write_table(table) writer.close() - - @staticmethod - def translate_data_type(singer_type: str | dict) -> t.Any: - """Translate from singer_type to a native type.""" - if singer_type in ["decimal", "float", "double"]: - return pa.decimal128 - if singer_type in ["date-time"]: - return pa.datetime - if singer_type in ["date"]: - return pa.date64 - return pa.string - - def _get_parquet_schema(self) -> list[tuple[str, t.Any]]: - col_list: list[tuple[str, t.Any]] = [] - for prop in self.schema["properties"]: - col_list.append( - (prop["name"], self.translate_data_type(prop["type"])), - ) - return col_list diff --git a/singer_sdk/testing/__init__.py b/singer_sdk/testing/__init__.py index 3ca07d964..24ce4ac9f 100644 --- a/singer_sdk/testing/__init__.py +++ b/singer_sdk/testing/__init__.py @@ -3,7 +3,7 @@ from __future__ import annotations from .config import SuiteConfig -from .factory import get_tap_test_class, get_target_test_class, get_test_class +from .factory import get_tap_test_class, get_target_test_class from .legacy import ( _get_tap_catalog, _select_all, @@ -19,7 +19,6 @@ __all__ = [ "get_tap_test_class", "get_target_test_class", - "get_test_class", "_get_tap_catalog", "_select_all", "get_standard_tap_tests", diff --git a/singer_sdk/testing/factory.py b/singer_sdk/testing/factory.py index 32c9e6469..99c9058c4 100644 --- a/singer_sdk/testing/factory.py +++ b/singer_sdk/testing/factory.py @@ -19,134 +19,331 @@ from singer_sdk import Tap, Target -def get_test_class( # noqa: C901 - test_runner: TapTestRunner | TargetTestRunner, - test_suites: list, - suite_config: SuiteConfig | None, -) -> object: - """Construct a valid pytest test class from given suites. - - Args: - test_runner: A Tap or Target test runner instance. - test_suites: A list of Test Suits to apply. - suite_config: SuiteConfig instance to pass to tests. - - Returns: - A test class usable by pytest. - """ - - class BaseTestClass: - """Base test class.""" - - params: dict = {} - param_ids: dict = {} - - @pytest.fixture - def config(self) -> SuiteConfig: - return suite_config or SuiteConfig() - - @pytest.fixture - def resource(self) -> t.Any: # noqa: ANN401, PT004 - yield # noqa: PT022 - - @pytest.fixture(scope="class") - def runner(self) -> TapTestRunner | TargetTestRunner: - # Populate runner class with cached records for use in tests - test_runner.sync_all() - return test_runner - - for suite in test_suites: - # make sure given runner is of type TapTestRunner - expected_runner_class = ( - TapTestRunner - if suite.kind in {"tap", "tap_stream", "tap_stream_attribute"} - else TargetTestRunner +class BaseTestClass: + """Base test class.""" + + params: dict = {} + param_ids: dict = {} + + +class TapTestClassFactory: + """Factory for Tap Test Classes.""" + + def __init__( + self, + tap_class: type[Tap], + *, + config: dict | None = None, + ): + """Initialize TapTestClassFactory. + + Args: + tap_class: Tap class to be tested. + config: Tap configuration for testing. + """ + self.tap_class = tap_class + self.config = config + + def new_test_class( + self, + *, + include_tap_tests: bool = True, + include_stream_tests: bool = True, + include_stream_attribute_tests: bool = True, + custom_suites: list | None = None, + suite_config: SuiteConfig | None = None, + **kwargs: t.Any, + ) -> type[BaseTestClass]: + """Get a new test class. + + Args: + include_tap_tests: Include tap tests in the test class. + include_stream_tests: Include stream tests in the test class. + include_stream_attribute_tests: + Include stream attribute tests in the test class. + custom_suites: List of custom test suites to include in the test class. + suite_config: SuiteConfig instance to be used when instantiating tests. + kwargs: Default arguments to be passed to tap on create. + + Returns: + A new test class. + """ + # compile test suites + suites = custom_suites or [] + if include_tap_tests: + suites.append(tap_tests) + if include_stream_tests: + suites.append(tap_stream_tests) + if include_stream_attribute_tests: + suites.append(tap_stream_attribute_tests) + + # set default values + if "parse_env_config" not in kwargs: + kwargs["parse_env_config"] = True + + # create singleton test runner + test_runner = TapTestRunner( + tap_class=self.tap_class, + config=self.config, + suite_config=suite_config, + **kwargs, ) - assert isinstance(test_runner, expected_runner_class), ( - f"Test suite of kind {suite.kind} passed, " - f"but test runner if of type {type(test_runner)}." + + empty_test_class = self._get_empty_test_class( + test_runner=test_runner, + suite_config=suite_config, ) - test_runner = t.cast( - expected_runner_class, # type: ignore[valid-type] - test_runner, + return self._annotate_test_class( + empty_test_class=empty_test_class, + test_suites=suites, + test_runner=test_runner, ) - if suite.kind in {"tap", "target"}: - for test_class in suite.tests: - test = test_class() - test_name = f"test_{suite.kind}_{test.name}" - setattr(BaseTestClass, f"test_{suite.kind}_{test.name}", test.run) - - if suite.kind in {"tap_stream", "tap_stream_attribute"}: - streams = list(test_runner.new_tap().streams.values()) - - if suite.kind == "tap_stream": - params = [ - { - "stream": stream, - } - for stream in streams - ] - param_ids = [stream.name for stream in streams] - + def _get_empty_test_class( + self, + test_runner: TapTestRunner, + suite_config: SuiteConfig | None, + ) -> type[BaseTestClass]: + """Get an empty test class. + + Args: + test_runner: Test runner to be used in the test class. + suite_config: SuiteConfig instance to be used when instantiating tests. + + Returns: + An empty test class. + """ + + class TapTestClass(BaseTestClass): + """Tap Test Class.""" + + @pytest.fixture + def config(self) -> SuiteConfig: + return suite_config or SuiteConfig() + + @pytest.fixture + def resource(self) -> t.Any: # noqa: ANN401, PT004 + yield # noqa: PT022 + + @pytest.fixture(scope="class") + def runner(self) -> TapTestRunner | TargetTestRunner: + # Populate runner class with cached records for use in tests + test_runner.sync_all() + return test_runner + + return TapTestClass + + def _annotate_test_class( # noqa: C901 + self, + empty_test_class: type[BaseTestClass], + test_suites: list, + test_runner: TapTestRunner, + ) -> type[BaseTestClass]: + """Annotate test class with test methods. + + Args: + empty_test_class: Empty test class to be annotated. + test_suites: List of test suites to include in the test class. + test_runner: Test runner to be used in the test class. + + Returns: + An annotated test class. + """ + for suite in test_suites: + if suite.kind == "tap": for test_class in suite.tests: test = test_class() test_name = f"test_{suite.kind}_{test.name}" - setattr( - BaseTestClass, - test_name, - test.run, - ) - BaseTestClass.params[test_name] = params - BaseTestClass.param_ids[test_name] = param_ids - - if suite.kind == "tap_stream_attribute": - for test_class in suite.tests: - test = test_class() - test_name = f"test_{suite.kind}_{test.name}" - test_params = [] - test_ids = [] - for stream in streams: - test_params.extend( - [ - { - "stream": stream, - "attribute_name": property_name, - } - for property_name, property_schema in stream.schema[ - "properties" - ].items() - if test_class.evaluate( - stream=stream, - property_name=property_name, - property_schema=property_schema, - ) - ], - ) - test_ids.extend( - [ - f"{stream.name}.{property_name}" - for property_name, property_schema in stream.schema[ - "properties" - ].items() - if test_class.evaluate( - stream=stream, - property_name=property_name, - property_schema=property_schema, - ) - ], - ) - - if test_params: + setattr(empty_test_class, test_name, test.run) + + if suite.kind in {"tap_stream", "tap_stream_attribute"}: + streams = list(test_runner.new_tap().streams.values()) + + if suite.kind == "tap_stream": + params = [ + { + "stream": stream, + } + for stream in streams + ] + param_ids = [stream.name for stream in streams] + + for test_class in suite.tests: + test = test_class() + test_name = f"test_{suite.kind}_{test.name}" setattr( - BaseTestClass, + empty_test_class, test_name, test.run, ) - BaseTestClass.params[test_name] = test_params - BaseTestClass.param_ids[test_name] = test_ids + empty_test_class.params[test_name] = params + empty_test_class.param_ids[test_name] = param_ids + + if suite.kind == "tap_stream_attribute": + for test_class in suite.tests: + test = test_class() + test_name = f"test_{suite.kind}_{test.name}" + test_params = [] + test_ids = [] + for stream in streams: + test_params.extend( + [ + { + "stream": stream, + "attribute_name": property_name, + } + for property_name, property_schema in stream.schema[ + "properties" + ].items() + if test_class.evaluate( + stream=stream, + property_name=property_name, + property_schema=property_schema, + ) + ], + ) + test_ids.extend( + [ + f"{stream.name}.{property_name}" + for property_name, property_schema in stream.schema[ + "properties" + ].items() + if test_class.evaluate( + stream=stream, + property_name=property_name, + property_schema=property_schema, + ) + ], + ) + + if test_params: + setattr( + empty_test_class, + test_name, + test.run, + ) + empty_test_class.params[test_name] = test_params + empty_test_class.param_ids[test_name] = test_ids + + return empty_test_class + + +class TargetTestClassFactory: + """Factory for Target Test Classes.""" + + def __init__(self, target_class: type[Target], *, config: dict | None = None): + """Initialize TargetTestClassFactory. + + Args: + target_class: Target class to be tested. + config: Config to be used when instantiating tests. + """ + self.target_class = target_class + self.config = config + + def new_test_class( + self, + *, + custom_suites: list | None = None, + suite_config: SuiteConfig | None = None, + include_target_tests: bool = True, + **kwargs: t.Any, + ) -> type[BaseTestClass]: + """Get a new Target test class. + + Args: + custom_suites: List of custom test suites to include in the test class. + suite_config: SuiteConfig instance to be used when instantiating tests. + include_target_tests: Whether to include target tests in the test class. + kwargs: Keyword arguments to be passed to the Target on run. + + Returns: + A new Target test class. + """ + # compile test suites + suites = custom_suites or [] + if include_target_tests: + suites.append(target_tests) + + # set default values + if "parse_env_config" not in kwargs: + kwargs["parse_env_config"] = True + + empty_test_class = self._get_empty_test_class( + target_class=self.target_class, + config=self.config, + suite_config=suite_config, + **kwargs, + ) + return self._annotate_test_class( + empty_test_class=empty_test_class, + test_suites=suites, + ) + + def _get_empty_test_class( + self, + target_class: type[Target], + suite_config: SuiteConfig | None, + config: dict | None = None, + **kwargs: t.Any, + ) -> type[BaseTestClass]: + """Get an empty test class. + + Args: + target_class: Target class to be tested. + suite_config: SuiteConfig instance to be used when instantiating tests. + config: Config to be used when instantiating tests. + kwargs: Keyword arguments to be passed to the Target on run. + + Returns: + An empty test class. + """ + + class TargetTestClass(BaseTestClass): + """Target Test Class.""" + + @pytest.fixture + def config(self) -> SuiteConfig: + return suite_config or SuiteConfig() + + @pytest.fixture + def resource(self) -> t.Any: # noqa: ANN401, PT004 + yield # noqa: PT022 + + @pytest.fixture + def runner(self) -> TargetTestRunner: + # Instantiate new runner class and populate records for use in tests + return TargetTestRunner( + target_class=target_class, + config=config, + suite_config=suite_config, + **kwargs, + ) + + return TargetTestClass + + def _annotate_test_class( + self, + empty_test_class: type[BaseTestClass], + test_suites: list, + ) -> type[BaseTestClass]: + """Annotate test class with test methods. + + Args: + empty_test_class: Empty test class to be annotated. + test_suites: List of test suites to be included in the test class. + + Returns: + Annotated test class. + """ + for suite in test_suites: + if suite.kind == "target": + for test_class in suite.tests: + test = test_class() + test_name = f"test_{suite.kind}_{test.name}" + setattr(empty_test_class, test_name, test.run) - return BaseTestClass + return empty_test_class def get_tap_test_class( @@ -159,7 +356,7 @@ def get_tap_test_class( custom_suites: list | None = None, suite_config: SuiteConfig | None = None, **kwargs: t.Any, -) -> object: +) -> type[BaseTestClass]: """Get Tap Test Class. Args: @@ -175,27 +372,17 @@ def get_tap_test_class( Returns: A test class usable by pytest. """ - suites = custom_suites or [] - if include_tap_tests: - suites.append(tap_tests) - if include_stream_tests: - suites.append(tap_stream_tests) - if include_stream_attribute_tests: - suites.append(tap_stream_attribute_tests) - - # set default values - if "parse_env_config" not in kwargs: - kwargs["parse_env_config"] = True - - return get_test_class( - test_runner=TapTestRunner( - tap_class=tap_class, - config=config, - suite_config=suite_config, - **kwargs, - ), - test_suites=suites, + factory = TapTestClassFactory( + tap_class=tap_class, + config=config, + ) + return factory.new_test_class( + custom_suites=custom_suites, suite_config=suite_config, + include_tap_tests=include_tap_tests, + include_stream_tests=include_stream_tests, + include_stream_attribute_tests=include_stream_attribute_tests, + **kwargs, ) @@ -205,8 +392,9 @@ def get_target_test_class( config: dict | None = None, custom_suites: list | None = None, suite_config: SuiteConfig | None = None, + include_target_tests: bool = True, **kwargs: t.Any, -) -> object: +) -> type[BaseTestClass]: """Get Target Test Class. Args: @@ -214,24 +402,19 @@ def get_target_test_class( config: Config dict to use for testing. custom_suites: Custom test suites to add to standard tests. suite_config: SuiteConfig instance to pass to tests. + include_target_tests: Include standard target tests. kwargs: Keyword arguments to pass to the TapRunner. Returns: A test class usable by pytest. """ - suites = custom_suites or [] - suites.append(target_tests) - - # set default values - if "parse_env_config" not in kwargs: - kwargs["parse_env_config"] = True - - return get_test_class( - test_runner=TargetTestRunner( - target_class=target_class, - config=config, - **kwargs, - ), - test_suites=suites, + factory = TargetTestClassFactory( + target_class=target_class, + config=config, + ) + return factory.new_test_class( + custom_suites=custom_suites, suite_config=suite_config, + include_target_tests=include_target_tests, + **kwargs, ) diff --git a/singer_sdk/testing/runners.py b/singer_sdk/testing/runners.py index c32461d6b..16f2ba905 100644 --- a/singer_sdk/testing/runners.py +++ b/singer_sdk/testing/runners.py @@ -242,7 +242,7 @@ def target_input(self) -> t.IO[str]: if self.input_io: self._input = self.input_io elif self.input_filepath: - self._input = Path(self.input_filepath).open() + self._input = Path(self.input_filepath).open(encoding="utf8") return t.cast(t.IO[str], self._input) @target_input.setter diff --git a/singer_sdk/testing/target_tests.py b/singer_sdk/testing/target_tests.py index 50e48622a..8b582ed12 100644 --- a/singer_sdk/testing/target_tests.py +++ b/singer_sdk/testing/target_tests.py @@ -2,6 +2,8 @@ from __future__ import annotations +import pytest + from .templates import TargetFileTestTemplate, TargetTestTemplate @@ -52,6 +54,13 @@ class TargetInvalidSchemaTest(TargetFileTestTemplate): name = "invalid_schema" + def test(self) -> None: + """Run test.""" + # TODO: the SDK should raise a better error than Exception in this case + # https://github.com/meltano/sdk/issues/1755 + with pytest.raises(Exception): # noqa: PT011, B017 + super().test() + class TargetMultipleStateMessages(TargetFileTestTemplate): """Test Target correctly relays multiple received State messages (checkpoints).""" @@ -86,6 +95,13 @@ class TargetRecordBeforeSchemaTest(TargetFileTestTemplate): name = "record_before_schema" + def test(self) -> None: + """Run test.""" + # TODO: the SDK should raise a better error than KeyError in this case + # https://github.com/meltano/sdk/issues/1755 + with pytest.raises(KeyError): + super().test() + class TargetRecordMissingKeyProperty(TargetFileTestTemplate): """Test Target handles record missing key property.""" diff --git a/tests/samples/test_target_parquet.py b/tests/samples/test_target_parquet.py index 5c984679b..7f53036a4 100644 --- a/tests/samples/test_target_parquet.py +++ b/tests/samples/test_target_parquet.py @@ -12,7 +12,9 @@ SAMPLE_FILEPATH = Path(f".output/test_{uuid.uuid4()}/") SAMPLE_FILENAME = SAMPLE_FILEPATH / "testfile.parquet" -SAMPLE_CONFIG = {"filepath": str(SAMPLE_FILENAME)} +SAMPLE_CONFIG = { + "filepath": str(SAMPLE_FILENAME), +} StandardTests = get_target_test_class( target_class=SampleTargetParquet,