Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Adds config_change_callback to Destinations and Sources #440

Merged
merged 11 commits into from
Nov 8, 2024
16 changes: 14 additions & 2 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from airbyte_protocol.models import (
AirbyteMessage,
ConnectorSpecification,
OrchestratorType,
Status,
TraceType,
Type,
Expand All @@ -35,7 +36,7 @@

if TYPE_CHECKING:
import logging
from collections.abc import Generator
from collections.abc import Callable, Generator
from typing import IO

from airbyte._executors.base import Executor
Expand All @@ -56,13 +57,15 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
*,
validate: bool = False,
) -> None:
"""Initialize the source.

If config is provided, it will be validated against the spec if validate is True.
"""
self.config_change_callback = config_change_callback
self.executor = executor
self._name = name
self._config_dict: dict[str, Any] | None = None
Expand Down Expand Up @@ -361,7 +364,8 @@ def _peek_airbyte_message(

This method handles reading Airbyte messages and taking action, if needed, based on the
message type. For instance, log messages are logged, records are tallied, and errors are
raised as exceptions if `raise_on_error` is True.
raised as exceptions if `raise_on_error` is True. If a config change message is received,
the config change callback is called.

Raises:
AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
Expand All @@ -380,6 +384,14 @@ def _peek_airbyte_message(
)
return

if (
message.type == Type.CONTROL
and message.control.type == OrchestratorType.CONNECTOR_CONFIG
and self.config_change_callback is not None
):
self.config_change_callback(message.control.connectorConfig.config)
return

def _execute(
self,
args: list[str],
Expand Down
4 changes: 4 additions & 0 deletions airbyte/destinations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@


if TYPE_CHECKING:
from collections.abc import Callable

from airbyte._executors.base import Executor
from airbyte.caches.base import CacheBase
from airbyte.shared.state_writers import StateWriterBase
Expand All @@ -48,6 +50,7 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
tcboles marked this conversation as resolved.
Show resolved Hide resolved
*,
validate: bool = False,
) -> None:
Expand All @@ -59,6 +62,7 @@ def __init__(
executor=executor,
name=name,
config=config,
config_change_callback=config_change_callback,
validate=validate,
)

Expand Down
6 changes: 5 additions & 1 deletion airbyte/destinations/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@


if TYPE_CHECKING:
from collections.abc import Callable
from pathlib import Path


def get_destination(
def get_destination( # noqa: PLR0913 # Too many arguments
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
*,
version: str | None = None,
pip_url: str | None = None,
Expand All @@ -33,6 +35,7 @@ def get_destination(
name: connector name
config: connector config - if not provided, you need to set it later via the set_config
method.
config_change_callback: callback function to be called when the connector config changes.
streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the `select_streams()` or
`select_all_streams()` method.
Expand All @@ -58,6 +61,7 @@ def get_destination(
return Destination(
name=name,
config=config,
config_change_callback=config_change_callback,
executor=get_connector_executor(
name=name,
version=version,
Expand Down
4 changes: 3 additions & 1 deletion airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@


if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator
from collections.abc import Callable, Generator, Iterable, Iterator

from airbyte_cdk import ConnectorSpecification
from airbyte_protocol.models import AirbyteStream
Expand All @@ -58,6 +58,7 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
streams: str | list[str] | None = None,
*,
validate: bool = False,
Expand All @@ -73,6 +74,7 @@ def __init__(
executor=executor,
name=name,
config=config,
config_change_callback=config_change_callback,
validate=validate,
)
self._config_dict: dict[str, Any] | None = None
Expand Down
4 changes: 4 additions & 0 deletions airbyte/sources/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


if TYPE_CHECKING:
from collections.abc import Callable
from pathlib import Path


Expand Down Expand Up @@ -45,6 +46,7 @@ def get_connector(
def get_source( # noqa: PLR0913 # Too many arguments
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
*,
streams: str | list[str] | None = None,
version: str | None = None,
Expand Down Expand Up @@ -72,6 +74,7 @@ def get_source( # noqa: PLR0913 # Too many arguments
name: connector name
config: connector config - if not provided, you need to set it later via the set_config
method.
config_change_callback: callback function to be called when the connector config changes.
streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the `select_streams()` or
`select_all_streams()` method.
Expand Down Expand Up @@ -103,6 +106,7 @@ def get_source( # noqa: PLR0913 # Too many arguments
return Source(
name=name,
config=config,
config_change_callback=config_change_callback,
streams=streams,
executor=get_connector_executor(
name=name,
Expand Down
103 changes: 103 additions & 0 deletions tests/integration_tests/test_config_change_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""Integration tests which test destination capabilities using the JSONL destination (docker-based)."""

from __future__ import annotations

import pytest
from typing import Any
from unittest.mock import patch
from airbyte import get_source, get_destination, Source, Destination
from airbyte_protocol.models import (
AirbyteMessage,
Type,
AirbyteControlMessage,
OrchestratorType,
AirbyteControlConnectorConfigMessage,
)


def config_change_callback(config: dict[str, Any]) -> None:
print(f"Updated config: {config}")


@pytest.fixture
def new_duckdb_destination() -> Destination:
"""Return a new JSONL destination."""
return get_destination(
name="destination-duckdb",
config={
# This path is relative to the container:
"destination_path": "/local/temp/db.duckdb",
},
config_change_callback=config_change_callback,
)


@pytest.fixture
def new_source_faker() -> Source:
return get_source(
"source-faker",
config={
"count": 100,
"seed": 1234,
"parallelism": 16,
},
install_if_missing=True,
streams=["products"],
config_change_callback=config_change_callback,
)


def test_source_config_callback(
new_duckdb_destination: Destination,
new_source_faker: Source,
) -> None:
with patch.object(
new_source_faker, "config_change_callback"
) as mock_config_change_callback:
updated_config = {
"count": 1000,
"seed": 1234,
"parallelism": 16,
}
airbyte_source_control_message = AirbyteMessage(
type=Type.CONTROL,
control=AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=0,
connectorConfig=AirbyteControlConnectorConfigMessage(
config=updated_config
),
),
)

new_source_faker._peek_airbyte_message(airbyte_source_control_message)
mock_config_change_callback.assert_called_once_with(updated_config)


def test_destination_config_callback(
new_duckdb_destination: Destination,
new_source_faker: Source,
) -> None:
with patch.object(
new_duckdb_destination, "config_change_callback"
) as mock_config_change_callback:
updated_config = {
"destination_path": "/local/temp/db.duckdb",
}
airbyte_destination_control_message = AirbyteMessage(
type=Type.CONTROL,
control=AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=0,
connectorConfig=AirbyteControlConnectorConfigMessage(
config=updated_config
),
),
)

new_duckdb_destination._peek_airbyte_message(
airbyte_destination_control_message
)
mock_config_change_callback.assert_called_once_with(updated_config)
Loading