Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Low-Code] ConcurrentDeclarativeSource class that low-code connectors can inherit from to uptake Concurrent CDK #46662

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1130a74
initial work to create the ConcurrentDeclarativeSource that can run i…
brianjlai Sep 23, 2024
54a1f85
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Sep 26, 2024
0bd2deb
adding more tests and fixing bugs for only syncing streams in catalog…
brianjlai Oct 1, 2024
2ad65d9
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 2, 2024
8861054
fix a few more merge conflict errors
brianjlai Oct 2, 2024
aa55ab7
Fix tests and add cursor granularity to the cursor partition generator
brianjlai Oct 3, 2024
4288b8d
integrate YamlDeclarativeSource with ConcurrentDeclarativeSource and …
brianjlai Oct 9, 2024
1650e30
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 9, 2024
98c42a7
rebase, formatting, fix tests, add new test cases for concurrency level
brianjlai Oct 9, 2024
0f79069
forgot to remove change to test
brianjlai Oct 9, 2024
5992e19
fix mypy errors and a few others bugs and testing
brianjlai Oct 12, 2024
6a160c0
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 12, 2024
b09311f
add logic to skip streams using non-thread safe stream_state, pr feed…
brianjlai Oct 16, 2024
0180e11
fix formatting and mypy checks
brianjlai Oct 16, 2024
0c0c019
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 16, 2024
61dfb9b
fix more mypy
brianjlai Oct 16, 2024
5dd1121
mypy
brianjlai Oct 16, 2024
7bd3056
pr feedback and updates to source-sentry for testing
brianjlai Oct 17, 2024
bef1c03
sentry lockfile
brianjlai Oct 17, 2024
c2e3bdb
update base image
brianjlai Oct 17, 2024
9116da6
bump amplitude dependencies and versions for testing
brianjlai Oct 17, 2024
80186ca
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 17, 2024
1242296
add logging for incremental streams that are not thread safe
brianjlai Oct 17, 2024
b74b942
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 17, 2024
99bee1e
remove amplitude version bump
brianjlai Oct 17, 2024
15cb6dd
get rid of stream_state interpolation in sentry
brianjlai Oct 18, 2024
9b5eb62
whatever
brianjlai Oct 19, 2024
b82c21c
parse DatetimeBasedCursorModel to ConcurrentCursor, bugfixes, pr feed…
brianjlai Oct 19, 2024
6a91848
formatting + mypy
brianjlai Oct 19, 2024
15127a7
fix mypy by replacing empty tuple() with None to make it truly optional
brianjlai Oct 19, 2024
977c525
remove local cdk from sentry
brianjlai Oct 19, 2024
4e38c4e
update lockfile
brianjlai Oct 19, 2024
f7f3e9d
swapped updating lockfiles
brianjlai Oct 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
"""

@abstractmethod
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]:
"""
:param config: The user-provided configuration as specified by the source's spec.
:param include_concurrent_streams: Concurrent sources can be made up of streams that can be run concurrently and
ones that must be run synchronously. By default, for backwards compatibility this is disabled.
Any stream construction related operation should happen here.
:return: A list of the streams in this source connector.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
streams = source.streams(config=config, include_concurrent_streams=True) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
Copy link
Contributor

@maxi297 maxi297 Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Regarding the # type: ignore that was already there before your changes: Should we just make it part of the interface then? I'm not sure why ConnectionChecker.check does not only take DeclarativeSource then. It seems like even the typing for Source is too large as streams is defined on the AbstractStream level. It feels like updating the typing issue, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_name_to_stream = {s.name: s for s in streams}
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,13 @@ def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[S
def _calculate_earliest_possible_value(self, end_datetime: datetime.datetime) -> datetime.datetime:
lookback_delta = self._parse_timedelta(self._lookback_window.eval(self.config) if self._lookback_window else "P0D")
earliest_possible_start_datetime = min(self._start_datetime.get_datetime(self.config), end_datetime)
cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())
return max(earliest_possible_start_datetime, cursor_datetime) - lookback_delta
try:
cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state()) - lookback_delta
except OverflowError:
# cursor_datetime defers to the minimum date if it does not exist in the state. Trying to subtract
# a timedelta from the minimum datetime results in an OverflowError
cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())
return max(earliest_possible_start_datetime, cursor_datetime)

def select_best_end_datetime(self) -> datetime.datetime:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def eval(self, config: Config, **kwargs: Any) -> Any:
return self.string
if self._is_plain_string is None:
# Let's check whether output from evaluation is the same as input.
# This indicates occurence of a plain string, not a template and we can skip Jinja in subsequent runs.
# This indicates occurrence of a plain string, not a template and we can skip Jinja in subsequent runs.
evaluated = self._interpolation.eval(self.string, config, self.default, parameters=self._parameters, **kwargs)
self._is_plain_string = self.string == evaluated
return evaluated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
from copy import deepcopy
from importlib import metadata
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, Dict, Iterator, List, Mapping, Optional, Tuple, Union

import yaml
from airbyte_cdk.models import (
Expand Down Expand Up @@ -88,7 +88,7 @@ def connection_checker(self) -> ConnectionChecker:
else:
raise ValueError(f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}")

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]:
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})
stream_configs = self._stream_configs(self._source_config)

Expand Down Expand Up @@ -159,7 +159,7 @@ def read(
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[Union[List[AirbyteStateMessage], MutableMapping[str, Any]]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
) -> Iterator[AirbyteMessage]:
self._configure_logger_level(logger)
yield from super().read(logger, config, catalog, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

from __future__ import annotations

import datetime
import importlib
import inspect
import re
from functools import partial
from typing import Any, Callable, Dict, List, Mapping, Optional, Type, Union, get_args, get_origin, get_type_hints
from typing import Any, Callable, Dict, List, Mapping, MutableMapping, Optional, Tuple, Type, Union, get_args, get_origin, get_type_hints

from airbyte_cdk.models import FailureType, Level
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
Expand Down Expand Up @@ -167,6 +169,11 @@
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import KeysToLowerTransformation
from airbyte_cdk.sources.message import InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, MessageRepository
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
CustomOutputFormatConcurrentStreamStateConverter,
DateTimeStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
Expand Down Expand Up @@ -457,6 +464,137 @@ def create_concurrency_level(model: ConcurrencyLevelModel, config: Config, **kwa
parameters={},
)

def create_concurrent_cursor_from_datetime_based_cursor(
self,
state_manager: ConnectorStateManager,
model_type: Type[BaseModel],
component_definition: ComponentDefinition,
stream_name: str,
stream_namespace: Optional[str],
config: Config,
stream_state: MutableMapping[str, Any],
**kwargs: Any,
) -> Tuple[ConcurrentCursor, DateTimeStreamStateConverter]:

component_type = component_definition.get("type")
if component_definition.get("type") != model_type.__name__:
raise ValueError(f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead")

datetime_based_cursor_model = model_type.parse_obj(component_definition)

if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel):
raise ValueError(f"Expected {model_type.__name__} component, but received {datetime_based_cursor_model.__class__.__name__}")

interpolated_cursor_field = InterpolatedString.create(
datetime_based_cursor_model.cursor_field, parameters=datetime_based_cursor_model.parameters or {}
)
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))

interpolated_partition_field_start = InterpolatedString.create(
datetime_based_cursor_model.partition_field_start or "start_time", parameters=datetime_based_cursor_model.parameters or {}
)
interpolated_partition_field_end = InterpolatedString.create(
datetime_based_cursor_model.partition_field_end or "end_time", parameters=datetime_based_cursor_model.parameters or {}
)

slice_boundary_fields = (
interpolated_partition_field_start.eval(config=config),
interpolated_partition_field_end.eval(config=config),
)

datetime_format = datetime_based_cursor_model.datetime_format

cursor_granularity = (
parse_duration(datetime_based_cursor_model.cursor_granularity) if datetime_based_cursor_model.cursor_granularity else None
)

lookback_window = None
interpolated_lookback_window = (
InterpolatedString.create(datetime_based_cursor_model.lookback_window, parameters=datetime_based_cursor_model.parameters or {})
if datetime_based_cursor_model.lookback_window
else None
)
if interpolated_lookback_window:
evaluated_lookback_window = interpolated_lookback_window.eval(config=config)
if evaluated_lookback_window:
lookback_window = parse_duration(evaluated_lookback_window)

connector_state_converter = CustomOutputFormatConcurrentStreamStateConverter(
datetime_format=datetime_format,
is_sequential_state=True,
cursor_granularity=cursor_granularity,
# type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
)

start_date_runtime_value: Union[InterpolatedString, str, MinMaxDatetime]
if isinstance(datetime_based_cursor_model.start_datetime, MinMaxDatetimeModel):
start_date_runtime_value = self.create_min_max_datetime(model=datetime_based_cursor_model.start_datetime, config=config)
else:
start_date_runtime_value = datetime_based_cursor_model.start_datetime

end_date_runtime_value: Optional[Union[InterpolatedString, str, MinMaxDatetime]]
if isinstance(datetime_based_cursor_model.end_datetime, MinMaxDatetimeModel):
end_date_runtime_value = self.create_min_max_datetime(model=datetime_based_cursor_model.end_datetime, config=config)
else:
end_date_runtime_value = datetime_based_cursor_model.end_datetime

interpolated_start_date = MinMaxDatetime.create(
interpolated_string_or_min_max_datetime=start_date_runtime_value, parameters=datetime_based_cursor_model.parameters
)
interpolated_end_date = (
None if not end_date_runtime_value else MinMaxDatetime.create(end_date_runtime_value, datetime_based_cursor_model.parameters)
)

# If datetime format is not specified then start/end datetime should inherit it from the stream slicer
if not interpolated_start_date.datetime_format:
interpolated_start_date.datetime_format = datetime_format
if interpolated_end_date and not interpolated_end_date.datetime_format:
interpolated_end_date.datetime_format = datetime_format

start_date = interpolated_start_date.get_datetime(config=config)
end_date_provider = (
partial(interpolated_end_date.get_datetime, config) if interpolated_end_date else connector_state_converter.get_end_provider()
)

if (datetime_based_cursor_model.step and not datetime_based_cursor_model.cursor_granularity) or (
not datetime_based_cursor_model.step and datetime_based_cursor_model.cursor_granularity
):
raise ValueError(
f"If step is defined, cursor_granularity should be as well and vice-versa. "
f"Right now, step is `{datetime_based_cursor_model.step}` and cursor_granularity is `{datetime_based_cursor_model.cursor_granularity}`"
)

# When step is not defined, default to a step size from the starting date to the present moment
step_length = datetime.datetime.now(tz=datetime.timezone.utc) - start_date
interpolated_step = (
InterpolatedString.create(datetime_based_cursor_model.step, parameters=datetime_based_cursor_model.parameters or {})
if datetime_based_cursor_model.step
else None
)
if interpolated_step:
evaluated_step = interpolated_step.eval(config)
if evaluated_step:
step_length = parse_duration(evaluated_step)

return (
ConcurrentCursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
slice_boundary_fields=slice_boundary_fields,
start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
lookback_window=lookback_window,
slice_range=step_length,
cursor_granularity=cursor_granularity,
),
connector_state_converter,
)

@staticmethod
def create_constant_backoff_strategy(model: ConstantBackoffStrategyModel, config: Config, **kwargs: Any) -> ConstantBackoffStrategy:
return ConstantBackoffStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
)
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_input_provider import InterpolatedRequestInputProvider
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from deprecated import deprecated

RequestInput = Union[str, Mapping[str, str]]
ValidRequestTypes = (str, list)
Expand Down Expand Up @@ -109,3 +111,34 @@ def get_request_body_json(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._body_json_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)

@deprecated("This class is temporary and used to incrementally deliver low-code to concurrent", category=ExperimentalClassWarning)
def request_options_contain_stream_state(self) -> bool:
"""
Temporary helper method used as we move low-code streams to the concurrent framework. This method determines if
the InterpolatedRequestOptionsProvider has is a dependency on a non-thread safe interpolation context such as
stream_state.
"""

return (
self._check_if_interpolation_uses_stream_state(self.request_parameters)
or self._check_if_interpolation_uses_stream_state(self.request_headers)
or self._check_if_interpolation_uses_stream_state(self.request_body_data)
or self._check_if_interpolation_uses_stream_state(self.request_body_json)
)

@staticmethod
def _check_if_interpolation_uses_stream_state(request_input: Optional[Union[RequestInput, NestedMapping]]) -> bool:
if not request_input:
return False
elif isinstance(request_input, str):
return "stream_state" in request_input
else:
for key, val in request_input.items():
# Covers the case of RequestInput in the form of a string or Mapping[str, str]. It also covers the case
# of a NestedMapping where the value is a string.
# Note: Doesn't account for nested mappings for request_body_json, but I don't see stream_state used in that way
# in our code
if "stream_state" in key or (isinstance(val, str) and "stream_state" in val):
return True
return False
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,37 @@
#

import pkgutil
from typing import Any
from typing import Any, List, Mapping, Optional

import yaml
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource
from airbyte_cdk.sources.types import ConnectionDefinition


class YamlDeclarativeSource(ManifestDeclarativeSource):
class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
"""Declarative source defined by a yaml file"""

def __init__(self, path_to_yaml: str, debug: bool = False) -> None:
def __init__(
self,
path_to_yaml: str,
debug: bool = False,
catalog: Optional[ConfiguredAirbyteCatalog] = None,
config: Optional[Mapping[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
) -> None:
"""
:param path_to_yaml: Path to the yaml file describing the source
"""
self._path_to_yaml = path_to_yaml
source_config = self._read_and_parse_yaml_file(path_to_yaml)
super().__init__(source_config, debug)

super().__init__(
catalog=catalog or ConfiguredAirbyteCatalog(streams=[]),
config=config or {},
state=state or [],
source_config=source_config,
)

def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
package = self.__class__.__module__.split(".")[0]
Expand Down
Loading
Loading