From abe154ee6daed97892d367ebfdf46618b3948c5c Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Mon, 4 Nov 2024 14:35:46 -0500 Subject: [PATCH 01/11] Adds config_change_callback to Destinations and Sources --- airbyte/_connector_base.py | 15 ++++++++++++++- airbyte/destinations/base.py | 2 ++ airbyte/sources/base.py | 2 ++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 8d7dbc00..a09c4ed1 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -56,6 +56,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, + config_change_callback: Callable[[dict[str, Any], int], None] | None = None, *, validate: bool = False, ) -> None: @@ -63,6 +64,7 @@ def __init__( If config is provided, it will be validated against the spec if validate is True. """ + self.config_change_callback = config_change_callback self.executor = executor self._name = name self._config_dict: dict[str, Any] | None = None @@ -361,7 +363,8 @@ def _peek_airbyte_message( This method handles reading Airbyte messages and taking action, if needed, based on the message type. For instance, log messages are logged, records are tallied, and errors are - raised as exceptions if `raise_on_error` is True. + raised as exceptions if `raise_on_error` is True. If a config change message is received, + the config change callback is called. Raises: AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. @@ -380,6 +383,16 @@ def _peek_airbyte_message( ) return + if ( + message.type == "CONTROL" + and message.control.type == "CONNECTOR_CONFIG" + and self.config_change_callback is not None + ): + self.config_change_callback( + message.control.config, message.control.emitted_at + ) + return + def _execute( self, args: list[str], diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index 1b34d40f..d8899fcb 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -48,6 +48,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, + config_change_callback: Callable[[dict[str, Any]], None] | None = None, *, validate: bool = False, ) -> None: @@ -59,6 +60,7 @@ def __init__( executor=executor, name=name, config=config, + config_change_callback=config_change_callback, validate=validate, ) diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index aeffc8ed..cca48b4b 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -58,6 +58,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, + config_change_callback: Callable[[dict[str, Any]], None] | None = None, streams: str | list[str] | None = None, *, validate: bool = False, @@ -73,6 +74,7 @@ def __init__( executor=executor, name=name, config=config, + config_change_callback=config_change_callback, validate=validate, ) self._config_dict: dict[str, Any] | None = None From 56507288e3bde4ca91c079b6d6d03bc2ffeb7b07 Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Mon, 4 Nov 2024 14:42:52 -0500 Subject: [PATCH 02/11] Adds config_change_callback to get_source and get_destination --- airbyte/destinations/base.py | 2 +- airbyte/destinations/util.py | 2 ++ airbyte/sources/base.py | 2 +- airbyte/sources/util.py | 2 ++ 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index d8899fcb..e42d0414 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -48,7 +48,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, - config_change_callback: Callable[[dict[str, Any]], None] | None = None, + config_change_callback: Callable[[dict[str, Any], int], None] | None = None, *, validate: bool = False, ) -> None: diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index 0531af4a..605f9b13 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -19,6 +19,7 @@ def get_destination( name: str, config: dict[str, Any] | None = None, + config_change_callback: Callable[[dict[str, Any], int], None] | None = None, *, version: str | None = None, pip_url: str | None = None, @@ -58,6 +59,7 @@ def get_destination( return Destination( name=name, config=config, + config_change_callback=config_change_callback, executor=get_connector_executor( name=name, version=version, diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index cca48b4b..81505760 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -58,7 +58,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, - config_change_callback: Callable[[dict[str, Any]], None] | None = None, + config_change_callback: Callable[[dict[str, Any], int], None] | None = None, streams: str | list[str] | None = None, *, validate: bool = False, diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index b2c01939..ed6f8bff 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -45,6 +45,7 @@ def get_connector( def get_source( # noqa: PLR0913 # Too many arguments name: str, config: dict[str, Any] | None = None, + config_change_callback: Callable[[dict[str, Any], int], None] | None = None, *, streams: str | list[str] | None = None, version: str | None = None, @@ -103,6 +104,7 @@ def get_source( # noqa: PLR0913 # Too many arguments return Source( name=name, config=config, + config_change_callback=config_change_callback, streams=streams, executor=get_connector_executor( name=name, From 23aa7b6ef717bf8f650c702b6ed5d06c7aa3a4d6 Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Mon, 4 Nov 2024 14:53:39 -0500 Subject: [PATCH 03/11] Adds missing Callable import --- airbyte/_connector_base.py | 2 +- airbyte/destinations/base.py | 2 +- airbyte/destinations/util.py | 2 +- airbyte/sources/base.py | 2 +- airbyte/sources/util.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index a09c4ed1..85ac700a 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -6,7 +6,7 @@ import abc import json from pathlib import Path -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Callable, Literal import jsonschema import rich diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index e42d0414..4ebbf040 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -7,7 +7,7 @@ from __future__ import annotations import warnings -from typing import IO, TYPE_CHECKING, Any, Literal, cast +from typing import IO, TYPE_CHECKING, Any, Callable, Literal, cast from airbyte_protocol.models import ( Type, diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index 605f9b13..ab934cb2 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -6,7 +6,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Callable from airbyte._executors.util import get_connector_executor from airbyte.destinations.base import Destination diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 81505760..c120f41c 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -6,7 +6,7 @@ import json import warnings from pathlib import Path -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Callable, Literal import yaml from rich import print # noqa: A004 # Allow shadowing the built-in diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index ed6f8bff..89420be9 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -5,7 +5,7 @@ import warnings from decimal import Decimal, InvalidOperation -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Callable from airbyte._executors.util import get_connector_executor from airbyte.exceptions import PyAirbyteInputError From 6693397467aca93885c6d97ed39d7933bb24ebfd Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Tue, 5 Nov 2024 00:04:38 +0000 Subject: [PATCH 04/11] Auto-fix lint and format issues --- airbyte/_connector_base.py | 7 +++---- airbyte/destinations/base.py | 3 ++- airbyte/destinations/util.py | 3 ++- airbyte/sources/base.py | 3 ++- airbyte/sources/util.py | 3 ++- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 85ac700a..a8b489d4 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -5,8 +5,9 @@ import abc import json +from collections.abc import Callable from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, Literal +from typing import TYPE_CHECKING, Any, Literal import jsonschema import rich @@ -388,9 +389,7 @@ def _peek_airbyte_message( and message.control.type == "CONNECTOR_CONFIG" and self.config_change_callback is not None ): - self.config_change_callback( - message.control.config, message.control.emitted_at - ) + self.config_change_callback(message.control.config, message.control.emitted_at) return def _execute( diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index 4ebbf040..69873de4 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -7,7 +7,8 @@ from __future__ import annotations import warnings -from typing import IO, TYPE_CHECKING, Any, Callable, Literal, cast +from collections.abc import Callable +from typing import IO, TYPE_CHECKING, Any, Literal, cast from airbyte_protocol.models import ( Type, diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index ab934cb2..b6cac9fc 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -6,7 +6,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Callable +from collections.abc import Callable +from typing import TYPE_CHECKING, Any from airbyte._executors.util import get_connector_executor from airbyte.destinations.base import Destination diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index c120f41c..981afe51 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -5,8 +5,9 @@ import json import warnings +from collections.abc import Callable from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, Literal +from typing import TYPE_CHECKING, Any, Literal import yaml from rich import print # noqa: A004 # Allow shadowing the built-in diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index 89420be9..784fc6ad 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -4,8 +4,9 @@ from __future__ import annotations import warnings +from collections.abc import Callable from decimal import Decimal, InvalidOperation -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any from airbyte._executors.util import get_connector_executor from airbyte.exceptions import PyAirbyteInputError From 41afdcb375884f5889751207846a2fbc5aea7cda Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Tue, 5 Nov 2024 00:04:38 +0000 Subject: [PATCH 05/11] Auto-fix lint issues (unsafe) --- airbyte/_connector_base.py | 3 +-- airbyte/destinations/base.py | 3 ++- airbyte/destinations/util.py | 2 +- airbyte/sources/base.py | 3 +-- airbyte/sources/util.py | 2 +- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index a8b489d4..71d6a075 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -5,7 +5,6 @@ import abc import json -from collections.abc import Callable from pathlib import Path from typing import TYPE_CHECKING, Any, Literal @@ -36,7 +35,7 @@ if TYPE_CHECKING: import logging - from collections.abc import Generator + from collections.abc import Callable, Generator from typing import IO from airbyte._executors.base import Executor diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index 69873de4..e5f41de5 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -7,7 +7,6 @@ from __future__ import annotations import warnings -from collections.abc import Callable from typing import IO, TYPE_CHECKING, Any, Literal, cast from airbyte_protocol.models import ( @@ -34,6 +33,8 @@ if TYPE_CHECKING: + from collections.abc import Callable + from airbyte._executors.base import Executor from airbyte.caches.base import CacheBase from airbyte.shared.state_writers import StateWriterBase diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index b6cac9fc..9f0cd89b 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -6,7 +6,6 @@ from __future__ import annotations -from collections.abc import Callable from typing import TYPE_CHECKING, Any from airbyte._executors.util import get_connector_executor @@ -14,6 +13,7 @@ if TYPE_CHECKING: + from collections.abc import Callable from pathlib import Path diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 981afe51..80bfd9d4 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -5,7 +5,6 @@ import json import warnings -from collections.abc import Callable from pathlib import Path from typing import TYPE_CHECKING, Any, Literal @@ -37,7 +36,7 @@ if TYPE_CHECKING: - from collections.abc import Generator, Iterable, Iterator + from collections.abc import Callable, Generator, Iterable, Iterator from airbyte_cdk import ConnectorSpecification from airbyte_protocol.models import AirbyteStream diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index 784fc6ad..eafc3857 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -4,7 +4,6 @@ from __future__ import annotations import warnings -from collections.abc import Callable from decimal import Decimal, InvalidOperation from typing import TYPE_CHECKING, Any @@ -14,6 +13,7 @@ if TYPE_CHECKING: + from collections.abc import Callable from pathlib import Path From 2fbce7fed0f7a4aa5ff9403d87efb542fe528673 Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Tue, 5 Nov 2024 11:43:12 -0500 Subject: [PATCH 06/11] Updates config_change_callback to only pass config --- airbyte/_connector_base.py | 4 ++-- airbyte/destinations/base.py | 2 +- airbyte/destinations/util.py | 2 +- airbyte/sources/base.py | 2 +- airbyte/sources/util.py | 3 ++- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 71d6a075..deea63b5 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -56,7 +56,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, - config_change_callback: Callable[[dict[str, Any], int], None] | None = None, + config_change_callback: Callable[[dict[str, Any]], None] | None = None, *, validate: bool = False, ) -> None: @@ -388,7 +388,7 @@ def _peek_airbyte_message( and message.control.type == "CONNECTOR_CONFIG" and self.config_change_callback is not None ): - self.config_change_callback(message.control.config, message.control.emitted_at) + self.config_change_callback(message.control.config) return def _execute( diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index e5f41de5..1cb72efe 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -50,7 +50,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, - config_change_callback: Callable[[dict[str, Any], int], None] | None = None, + config_change_callback: Callable[[dict[str, Any]], None] | None = None, *, validate: bool = False, ) -> None: diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index 9f0cd89b..173d6215 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -20,7 +20,7 @@ def get_destination( name: str, config: dict[str, Any] | None = None, - config_change_callback: Callable[[dict[str, Any], int], None] | None = None, + config_change_callback: Callable[[dict[str, Any]], None] | None = None, *, version: str | None = None, pip_url: str | None = None, diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 80bfd9d4..9913aeb3 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -58,7 +58,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, - config_change_callback: Callable[[dict[str, Any], int], None] | None = None, + config_change_callback: Callable[[dict[str, Any]], None] | None = None, streams: str | list[str] | None = None, *, validate: bool = False, diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index eafc3857..731cccaa 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -46,7 +46,7 @@ def get_connector( def get_source( # noqa: PLR0913 # Too many arguments name: str, config: dict[str, Any] | None = None, - config_change_callback: Callable[[dict[str, Any], int], None] | None = None, + config_change_callback: Callable[[dict[str, Any]], None] | None = None, *, streams: str | list[str] | None = None, version: str | None = None, @@ -74,6 +74,7 @@ def get_source( # noqa: PLR0913 # Too many arguments name: connector name config: connector config - if not provided, you need to set it later via the set_config method. + config_change_callback: callback function to be called when the connector config changes. streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the `select_streams()` or `select_all_streams()` method. From 957f7eb41598ba8dca6c8f95dc1729a30d907e13 Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Tue, 5 Nov 2024 11:45:30 -0500 Subject: [PATCH 07/11] Adds doc comment to get_destination --- airbyte/destinations/util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index 173d6215..553c4a11 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -35,6 +35,7 @@ def get_destination( name: connector name config: connector config - if not provided, you need to set it later via the set_config method. + config_change_callback: callback function to be called when the connector config changes. streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the `select_streams()` or `select_all_streams()` method. From 99a4c44fc8d7f3946c8d8835e7b9f9eb80a61017 Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Tue, 5 Nov 2024 12:45:01 -0500 Subject: [PATCH 08/11] Adds integration tests and converts types to use Type and OrchestratorType classes --- airbyte/_connector_base.py | 7 +- .../integration_tests/test_config_callback.py | 87 +++++++++++++++++++ 2 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 tests/integration_tests/test_config_callback.py diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index deea63b5..b2b98a65 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -16,6 +16,7 @@ from airbyte_protocol.models import ( AirbyteMessage, ConnectorSpecification, + OrchestratorType, Status, TraceType, Type, @@ -384,11 +385,11 @@ def _peek_airbyte_message( return if ( - message.type == "CONTROL" - and message.control.type == "CONNECTOR_CONFIG" + message.type == Type.CONTROL + and message.control.type == OrchestratorType.CONNECTOR_CONFIG and self.config_change_callback is not None ): - self.config_change_callback(message.control.config) + self.config_change_callback(message.control.connectorConfig.config) return def _execute( diff --git a/tests/integration_tests/test_config_callback.py b/tests/integration_tests/test_config_callback.py new file mode 100644 index 00000000..98f22a95 --- /dev/null +++ b/tests/integration_tests/test_config_callback.py @@ -0,0 +1,87 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +"""Integration tests which test destination capabilities using the JSONL destination (docker-based).""" + + +from __future__ import annotations + +import pytest +from unittest.mock import patch +from airbyte import get_source, get_destination +from airbyte.destinations.base import Destination +from airbyte_protocol.models import AirbyteMessage, Type, AirbyteControlMessage, OrchestratorType, AirbyteControlConnectorConfigMessage + + +def config_change_callback(config: dict[str, Any]) -> None: + print(f"Updated config: {config}") + +@pytest.fixture +def new_duckdb_destination() -> Destination: + """Return a new JSONL destination.""" + return get_destination( + name="destination-duckdb", + config={ + # This path is relative to the container: + "destination_path": "/local/temp/db.duckdb", + }, + config_change_callback=config_change_callback + ) + + +@pytest.fixture +def new_source_faker() -> Source: + return get_source( + "source-faker", + config={ + "count": 100, + "seed": 1234, + "parallelism": 16, + }, + install_if_missing=True, + streams=["products"], + config_change_callback=config_change_callback, + ) + + +def test_source_config_callback( + new_duckdb_destination: Destination, + new_source_faker: Source, +) -> None: + with patch.object(new_source_faker, 'config_change_callback') as mock_config_change_callback: + updated_config = { + "count": 1000, + "seed": 1234, + "parallelism": 16, + } + airbyte_source_control_message = AirbyteMessage( + type=Type.CONTROL, + control=AirbyteControlMessage( + type=OrchestratorType.CONNECTOR_CONFIG, + emitted_at=0, + connectorConfig=AirbyteControlConnectorConfigMessage(config=updated_config), + ), + ) + + new_source_faker._peek_airbyte_message(airbyte_source_control_message) + mock_config_change_callback.assert_called_once_with(updated_config) + + +def test_destination_config_callback( + new_duckdb_destination: Destination, + new_source_faker: Source, +) -> None: + with patch.object(new_duckdb_destination, 'config_change_callback') as mock_config_change_callback: + updated_config = { + "destination_path": "/local/temp/db.duckdb", + } + airbyte_destination_control_message = AirbyteMessage( + type=Type.CONTROL, + control=AirbyteControlMessage( + type=OrchestratorType.CONNECTOR_CONFIG, + emitted_at=0, + connectorConfig=AirbyteControlConnectorConfigMessage(config=updated_config), + ), + ) + + new_duckdb_destination._peek_airbyte_message(airbyte_destination_control_message) + mock_config_change_callback.assert_called_once_with(updated_config) From 38accfb1daa2e6d0811ec377f12b3f10fdc336b2 Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Tue, 5 Nov 2024 12:49:57 -0500 Subject: [PATCH 09/11] Updates filename of test_config_callback to test_config_change_callback --- airbyte/destinations/util.py | 2 +- ...{test_config_callback.py => test_config_change_callback.py} | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) rename tests/integration_tests/{test_config_callback.py => test_config_change_callback.py} (96%) diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index 553c4a11..4e85eec5 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -28,7 +28,7 @@ def get_destination( docker_image: str | bool | None = None, use_host_network: bool = False, install_if_missing: bool = True, -) -> Destination: +) -> Destination: # pylint: disable=too-many-arguments """Get a connector by name and version. Args: diff --git a/tests/integration_tests/test_config_callback.py b/tests/integration_tests/test_config_change_callback.py similarity index 96% rename from tests/integration_tests/test_config_callback.py rename to tests/integration_tests/test_config_change_callback.py index 98f22a95..378543f5 100644 --- a/tests/integration_tests/test_config_callback.py +++ b/tests/integration_tests/test_config_change_callback.py @@ -6,8 +6,9 @@ from __future__ import annotations import pytest +from typing import Any from unittest.mock import patch -from airbyte import get_source, get_destination +from airbyte import get_source, get_destination, Source, Destination from airbyte.destinations.base import Destination from airbyte_protocol.models import AirbyteMessage, Type, AirbyteControlMessage, OrchestratorType, AirbyteControlConnectorConfigMessage From 42b7e6ebc0b428b9d17369e0c20b732edbb05f5b Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Tue, 5 Nov 2024 12:53:37 -0500 Subject: [PATCH 10/11] Misc lint fixes --- airbyte/destinations/util.py | 4 ++-- tests/integration_tests/test_config_change_callback.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index 4e85eec5..568d0099 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -17,7 +17,7 @@ from pathlib import Path -def get_destination( +def get_destination( # noqa: PLR0913 # Too many arguments name: str, config: dict[str, Any] | None = None, config_change_callback: Callable[[dict[str, Any]], None] | None = None, @@ -28,7 +28,7 @@ def get_destination( docker_image: str | bool | None = None, use_host_network: bool = False, install_if_missing: bool = True, -) -> Destination: # pylint: disable=too-many-arguments +) -> Destination: """Get a connector by name and version. Args: diff --git a/tests/integration_tests/test_config_change_callback.py b/tests/integration_tests/test_config_change_callback.py index 378543f5..44f24077 100644 --- a/tests/integration_tests/test_config_change_callback.py +++ b/tests/integration_tests/test_config_change_callback.py @@ -9,7 +9,6 @@ from typing import Any from unittest.mock import patch from airbyte import get_source, get_destination, Source, Destination -from airbyte.destinations.base import Destination from airbyte_protocol.models import AirbyteMessage, Type, AirbyteControlMessage, OrchestratorType, AirbyteControlConnectorConfigMessage From 5567cd2208f59ebf73200301565ee54a82faabed Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Tue, 5 Nov 2024 12:55:46 -0500 Subject: [PATCH 11/11] Formats test with ruff --- .../test_config_change_callback.py | 102 ++++++++++-------- 1 file changed, 59 insertions(+), 43 deletions(-) diff --git a/tests/integration_tests/test_config_change_callback.py b/tests/integration_tests/test_config_change_callback.py index 44f24077..f7b3d6a3 100644 --- a/tests/integration_tests/test_config_change_callback.py +++ b/tests/integration_tests/test_config_change_callback.py @@ -2,30 +2,36 @@ """Integration tests which test destination capabilities using the JSONL destination (docker-based).""" - from __future__ import annotations import pytest from typing import Any from unittest.mock import patch from airbyte import get_source, get_destination, Source, Destination -from airbyte_protocol.models import AirbyteMessage, Type, AirbyteControlMessage, OrchestratorType, AirbyteControlConnectorConfigMessage +from airbyte_protocol.models import ( + AirbyteMessage, + Type, + AirbyteControlMessage, + OrchestratorType, + AirbyteControlConnectorConfigMessage, +) def config_change_callback(config: dict[str, Any]) -> None: - print(f"Updated config: {config}") + print(f"Updated config: {config}") + @pytest.fixture def new_duckdb_destination() -> Destination: """Return a new JSONL destination.""" return get_destination( - name="destination-duckdb", - config={ - # This path is relative to the container: - "destination_path": "/local/temp/db.duckdb", - }, - config_change_callback=config_change_callback - ) + name="destination-duckdb", + config={ + # This path is relative to the container: + "destination_path": "/local/temp/db.duckdb", + }, + config_change_callback=config_change_callback, + ) @pytest.fixture @@ -39,7 +45,7 @@ def new_source_faker() -> Source: }, install_if_missing=True, streams=["products"], - config_change_callback=config_change_callback, + config_change_callback=config_change_callback, ) @@ -47,41 +53,51 @@ def test_source_config_callback( new_duckdb_destination: Destination, new_source_faker: Source, ) -> None: - with patch.object(new_source_faker, 'config_change_callback') as mock_config_change_callback: - updated_config = { - "count": 1000, - "seed": 1234, - "parallelism": 16, - } - airbyte_source_control_message = AirbyteMessage( - type=Type.CONTROL, - control=AirbyteControlMessage( - type=OrchestratorType.CONNECTOR_CONFIG, - emitted_at=0, - connectorConfig=AirbyteControlConnectorConfigMessage(config=updated_config), - ), - ) + with patch.object( + new_source_faker, "config_change_callback" + ) as mock_config_change_callback: + updated_config = { + "count": 1000, + "seed": 1234, + "parallelism": 16, + } + airbyte_source_control_message = AirbyteMessage( + type=Type.CONTROL, + control=AirbyteControlMessage( + type=OrchestratorType.CONNECTOR_CONFIG, + emitted_at=0, + connectorConfig=AirbyteControlConnectorConfigMessage( + config=updated_config + ), + ), + ) - new_source_faker._peek_airbyte_message(airbyte_source_control_message) - mock_config_change_callback.assert_called_once_with(updated_config) + new_source_faker._peek_airbyte_message(airbyte_source_control_message) + mock_config_change_callback.assert_called_once_with(updated_config) def test_destination_config_callback( - new_duckdb_destination: Destination, - new_source_faker: Source, + new_duckdb_destination: Destination, + new_source_faker: Source, ) -> None: - with patch.object(new_duckdb_destination, 'config_change_callback') as mock_config_change_callback: - updated_config = { - "destination_path": "/local/temp/db.duckdb", - } - airbyte_destination_control_message = AirbyteMessage( - type=Type.CONTROL, - control=AirbyteControlMessage( - type=OrchestratorType.CONNECTOR_CONFIG, - emitted_at=0, - connectorConfig=AirbyteControlConnectorConfigMessage(config=updated_config), - ), - ) + with patch.object( + new_duckdb_destination, "config_change_callback" + ) as mock_config_change_callback: + updated_config = { + "destination_path": "/local/temp/db.duckdb", + } + airbyte_destination_control_message = AirbyteMessage( + type=Type.CONTROL, + control=AirbyteControlMessage( + type=OrchestratorType.CONNECTOR_CONFIG, + emitted_at=0, + connectorConfig=AirbyteControlConnectorConfigMessage( + config=updated_config + ), + ), + ) - new_duckdb_destination._peek_airbyte_message(airbyte_destination_control_message) - mock_config_change_callback.assert_called_once_with(updated_config) + new_duckdb_destination._peek_airbyte_message( + airbyte_destination_control_message + ) + mock_config_change_callback.assert_called_once_with(updated_config)