Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: Add docs for callback, add type alias #447

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@

from airbyte import (
caches,
callbacks,
# cli, # Causes circular import if included
cloud,
constants,
Expand Down Expand Up @@ -157,6 +158,7 @@
__all__ = [
# Modules
"caches",
"callbacks",
# "cli", # Causes circular import if included
"cloud",
"constants",
Expand Down
5 changes: 3 additions & 2 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@

if TYPE_CHECKING:
import logging
from collections.abc import Callable, Generator
from collections.abc import Generator
from typing import IO

from airbyte._executors.base import Executor
from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.callbacks import ConfigChangeCallback
from airbyte.progress import ProgressTracker


Expand All @@ -57,7 +58,7 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
config_change_callback: ConfigChangeCallback | None = None,
*,
validate: bool = False,
) -> None:
Expand Down
50 changes: 50 additions & 0 deletions airbyte/callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Callbacks for working with PyAirbyte."""

from __future__ import annotations

from collections.abc import Callable
from typing import Any


ConfigChangeCallback = Callable[[dict[str, Any]], None]
"""Callback for when the configuration changes while the connector is running.

This callback can be passed to supporting functions like `airbyte.get_source()` and
`airbyte.get_destination()` to take action whenever configuration changes.
The callback will be called with the new configuration as the only argument.

The most common use case for this callback is for connectors with OAuth APIs to pass updated
refresh tokens when the previous token is about to expire.

Note that the dictionary passed will contain the entire configuration, not just the changed fields.

Example Usage:

```python
import airbyte as ab
import yaml
from pathlib import Path

config_file = Path("path/to/my/config.yaml")
config_dict = yaml.safe_load(config_file.read_text())

# Define the callback function:
def config_callback(new_config: dict[str, Any]) -> None:
# Write new config back to config file
config_file.write_text(yaml.safe_dump(new_config))

# Pass in the callback function when creating the source:
source = get_source(
"source-faker",
config=config_dict,
config_change_callback=config_callback,
)
# Now read as usual. If config changes during sync, the callback will be called.
source.read()
```

For more information on the underlying Airbyte protocol, please see documentation on
the [`CONNECTOR_CONFIG` control messages]
(https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytecontrolconnectorconfigmessage).
"""
5 changes: 2 additions & 3 deletions airbyte/destinations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@


if TYPE_CHECKING:
from collections.abc import Callable

from airbyte._executors.base import Executor
from airbyte.caches.base import CacheBase
from airbyte.callbacks import ConfigChangeCallback
from airbyte.shared.state_writers import StateWriterBase


Expand All @@ -50,8 +49,8 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
*,
config_change_callback: ConfigChangeCallback | None = None,
validate: bool = False,
) -> None:
"""Initialize the source.
Expand Down
5 changes: 3 additions & 2 deletions airbyte/destinations/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@


if TYPE_CHECKING:
from collections.abc import Callable
from pathlib import Path

from airbyte.callbacks import ConfigChangeCallback


def get_destination( # noqa: PLR0913 # Too many arguments
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
*,
config_change_callback: ConfigChangeCallback | None = None,
version: str | None = None,
pip_url: str | None = None,
local_executable: Path | str | None = None,
Expand Down
7 changes: 4 additions & 3 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@


if TYPE_CHECKING:
from collections.abc import Callable, Generator, Iterable, Iterator
from collections.abc import Generator, Iterable, Iterator

from airbyte_cdk import ConnectorSpecification
from airbyte_protocol.models import AirbyteStream

from airbyte._executors.base import Executor
from airbyte.caches import CacheBase
from airbyte.callbacks import ConfigChangeCallback
from airbyte.documents import Document
from airbyte.shared.state_providers import StateProviderBase
from airbyte.shared.state_writers import StateWriterBase
Expand All @@ -58,9 +59,9 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
streams: str | list[str] | None = None,
*,
config_change_callback: ConfigChangeCallback | None = None,
streams: str | list[str] | None = None,
validate: bool = False,
) -> None:
"""Initialize the source.
Expand Down
5 changes: 3 additions & 2 deletions airbyte/sources/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@


if TYPE_CHECKING:
from collections.abc import Callable
from pathlib import Path

from airbyte.callbacks import ConfigChangeCallback


def get_connector(
name: str,
Expand Down Expand Up @@ -46,8 +47,8 @@ def get_connector(
def get_source( # noqa: PLR0913 # Too many arguments
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
*,
config_change_callback: ConfigChangeCallback | None = None,
streams: str | list[str] | None = None,
version: str | None = None,
pip_url: str | None = None,
Expand Down
Loading