-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Concurrent Low-Code] ConcurrentDeclarativeSource class that low-code connectors can inherit from to uptake Concurrent CDK #46662
base: master
Are you sure you want to change the base?
Changes from 10 commits
1130a74
54a1f85
0bd2deb
2ad65d9
8861054
aa55ab7
4288b8d
1650e30
98c42a7
0f79069
5992e19
6a160c0
b09311f
0180e11
0c0c019
61dfb9b
5dd1121
7bd3056
bef1c03
c2e3bdb
9116da6
80186ca
1242296
b74b942
99bee1e
15cb6dd
9b5eb62
b82c21c
6a91848
15127a7
977c525
4e38c4e
f7f3e9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,278 @@ | ||||||||||||||
# | ||||||||||||||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||||||||||||||
# | ||||||||||||||
|
||||||||||||||
import datetime | ||||||||||||||
import logging | ||||||||||||||
from dataclasses import dataclass | ||||||||||||||
from functools import partial | ||||||||||||||
from typing import Any, Callable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union | ||||||||||||||
|
||||||||||||||
from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog | ||||||||||||||
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource | ||||||||||||||
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager | ||||||||||||||
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel | ||||||||||||||
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream | ||||||||||||||
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, DeclarativeCursor | ||||||||||||||
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource | ||||||||||||||
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ConcurrencyLevel as ConcurrencyLevelModel | ||||||||||||||
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory | ||||||||||||||
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever | ||||||||||||||
from airbyte_cdk.sources.declarative.types import ConnectionDefinition | ||||||||||||||
from airbyte_cdk.sources.source import TState | ||||||||||||||
from airbyte_cdk.sources.streams import Stream | ||||||||||||||
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream | ||||||||||||||
from airbyte_cdk.sources.streams.concurrent.adapters import CursorPartitionGenerator | ||||||||||||||
from airbyte_cdk.sources.streams.concurrent.availability_strategy import AlwaysAvailableAvailabilityStrategy | ||||||||||||||
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, CursorValueType, GapType | ||||||||||||||
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream | ||||||||||||||
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( | ||||||||||||||
CustomOutputFormatConcurrentStreamStateConverter, | ||||||||||||||
) | ||||||||||||||
from isodate import parse_duration | ||||||||||||||
|
||||||||||||||
|
||||||||||||||
@dataclass | ||||||||||||||
class DeclarativeCursorAttributes: | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I have a concern that this only works for DatetimeBasedCursor. How do we envision the next steps for that class? Could we avoid this class by having |
||||||||||||||
cursor_field: CursorField | ||||||||||||||
datetime_format: str | ||||||||||||||
slice_boundary_fields: Optional[Tuple[str, str]] | ||||||||||||||
start: Optional[CursorValueType] | ||||||||||||||
end_provider: Optional[Callable[[], CursorValueType]] | ||||||||||||||
lookback_window: Optional[GapType] | ||||||||||||||
slice_range: Optional[GapType] | ||||||||||||||
cursor_granularity: Optional[GapType] | ||||||||||||||
|
||||||||||||||
|
||||||||||||||
class ConcurrentDeclarativeSource(ManifestDeclarativeSource): | ||||||||||||||
def __init__( | ||||||||||||||
self, | ||||||||||||||
catalog: Optional[ConfiguredAirbyteCatalog], | ||||||||||||||
config: Optional[Mapping[str, Any]], | ||||||||||||||
state: TState, | ||||||||||||||
source_config: ConnectionDefinition, | ||||||||||||||
debug: bool = False, | ||||||||||||||
emit_connector_builder_messages: bool = False, | ||||||||||||||
component_factory: Optional[ModelToComponentFactory] = None, | ||||||||||||||
**kwargs, | ||||||||||||||
): | ||||||||||||||
super().__init__( | ||||||||||||||
source_config=source_config, | ||||||||||||||
debug=debug, | ||||||||||||||
emit_connector_builder_messages=emit_connector_builder_messages, | ||||||||||||||
component_factory=component_factory, | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
self._state = state | ||||||||||||||
|
||||||||||||||
# Alternatively if we don't want to modify run.py, we can separate and instantiate the ConcurrentSource | ||||||||||||||
self._concurrent_streams, self._synchronous_streams = self._separate_streams(config=config) | ||||||||||||||
|
||||||||||||||
concurrency_level_from_manifest = self._source_config.get("concurrency_level") | ||||||||||||||
if concurrency_level_from_manifest: | ||||||||||||||
concurrency_level_component = self._constructor.create_component( | ||||||||||||||
model_type=ConcurrencyLevelModel, component_definition=concurrency_level_from_manifest, config=config | ||||||||||||||
) | ||||||||||||||
if not isinstance(concurrency_level_component, ConcurrencyLevel): | ||||||||||||||
raise ValueError(f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}") | ||||||||||||||
|
||||||||||||||
concurrency_level = concurrency_level_component.get_concurrency_level() | ||||||||||||||
initial_number_of_partitions_to_generate = concurrency_level // 2 | ||||||||||||||
else: | ||||||||||||||
concurrency_level = 1 | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better to move the value from the code into a class variable may be to improve readability |
||||||||||||||
initial_number_of_partitions_to_generate = 1 | ||||||||||||||
|
||||||||||||||
self._concurrent_source = ConcurrentSource.create( | ||||||||||||||
num_workers=concurrency_level, | ||||||||||||||
initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, | ||||||||||||||
logger=self.logger, | ||||||||||||||
slice_logger=self._slice_logger, | ||||||||||||||
message_repository=self.message_repository, | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
def read( | ||||||||||||||
self, | ||||||||||||||
logger: logging.Logger, | ||||||||||||||
config: Mapping[str, Any], | ||||||||||||||
catalog: ConfiguredAirbyteCatalog, | ||||||||||||||
state: Optional[Union[List[AirbyteStateMessage], MutableMapping[str, Any]]] = None, | ||||||||||||||
) -> Iterator[AirbyteMessage]: | ||||||||||||||
|
||||||||||||||
# ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of the concurrent | ||||||||||||||
# streams must be saved so that they can be removed from the catalog before starting synchronous streams | ||||||||||||||
concurrent_stream_names = set([concurrent_stream.name for concurrent_stream in self._concurrent_streams]) | ||||||||||||||
|
||||||||||||||
selected_concurrent_streams = self._select_streams(streams=self._concurrent_streams, configured_catalog=catalog) | ||||||||||||||
# It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. | ||||||||||||||
# This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now | ||||||||||||||
if selected_concurrent_streams: | ||||||||||||||
yield from self._concurrent_source.read(selected_concurrent_streams) | ||||||||||||||
|
||||||||||||||
# Sync all streams that are not concurrent compatible. We filter out concurrent streams because the | ||||||||||||||
# existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many | ||||||||||||||
# of which were already synced using the Concurrent CDK | ||||||||||||||
filtered_catalog = self._remove_concurrent_streams_from_catalog(catalog=catalog, concurrent_stream_names=concurrent_stream_names) | ||||||||||||||
yield from super().read(logger, config, filtered_catalog, state) | ||||||||||||||
|
||||||||||||||
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: | ||||||||||||||
# I don't think should need to be overwritten because I added all_streams() and the underlying check -> | ||||||||||||||
# CheckStream.check_connection() should invoke all_streams(). It should also sort of be a no-op because | ||||||||||||||
# we've effectively deprecated availability_strategy as a concept which is all the ConnectionChecker does | ||||||||||||||
return super().check(logger=logger, config=config) | ||||||||||||||
|
||||||||||||||
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: | ||||||||||||||
return AirbyteCatalog(streams=[stream.as_airbyte_stream() for stream in self.all_streams(config=config)]) | ||||||||||||||
|
||||||||||||||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||||||||||||||
""" | ||||||||||||||
Returns the list of streams that can be run synchronously in the Python CDK. | ||||||||||||||
|
||||||||||||||
NOTE: For ConcurrentDeclarativeSource, this method only returns synchronous streams because it usage is invoked within the | ||||||||||||||
existing Python CDK. Streams that support concurrency are started from read(). | ||||||||||||||
""" | ||||||||||||||
return self._synchronous_streams | ||||||||||||||
|
||||||||||||||
def all_streams(self, config: Mapping[str, Any]) -> List[Union[AbstractStream, Stream]]: | ||||||||||||||
return self._synchronous_streams + self._concurrent_streams | ||||||||||||||
|
||||||||||||||
def _separate_streams(self, config: Mapping[str, Any]) -> (List[AbstractStream], List[Stream]): | ||||||||||||||
concurrent_streams: List[AbstractStream] = [] | ||||||||||||||
synchronous_streams: List[Stream] = [] | ||||||||||||||
|
||||||||||||||
state_manager = ConnectorStateManager(state=self._state) | ||||||||||||||
|
||||||||||||||
for declarative_stream in super().streams(config=config): | ||||||||||||||
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect | ||||||||||||||
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, | ||||||||||||||
# so we need to treat them as synchronous | ||||||||||||||
if isinstance(declarative_stream, DeclarativeStream): | ||||||||||||||
declarative_cursor_attributes = self._get_cursor_attributes(declarative_stream=declarative_stream, config=config) | ||||||||||||||
if declarative_cursor_attributes: | ||||||||||||||
stream_state = state_manager.get_stream_state( | ||||||||||||||
stream_name=declarative_stream.name, namespace=declarative_stream.namespace | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
connector_state_converter = CustomOutputFormatConcurrentStreamStateConverter( | ||||||||||||||
datetime_format=declarative_cursor_attributes.datetime_format, | ||||||||||||||
is_sequential_state=False, | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By disabling It feels like to de-risk this a little bit on the early release, we should set this to true so we accept sequential state (i.e. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you think we could add a mechanism to handle this situation by using the lowest value to set the sequential state if we revert the changes? |
||||||||||||||
cursor_granularity=declarative_cursor_attributes.cursor_granularity, | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
cursor = ConcurrentCursor( | ||||||||||||||
stream_name=declarative_stream.name, | ||||||||||||||
stream_namespace=declarative_stream.namespace, | ||||||||||||||
stream_state=stream_state, | ||||||||||||||
message_repository=self.message_repository, | ||||||||||||||
connector_state_manager=state_manager, | ||||||||||||||
connector_state_converter=connector_state_converter, | ||||||||||||||
cursor_field=declarative_cursor_attributes.cursor_field, | ||||||||||||||
slice_boundary_fields=declarative_cursor_attributes.slice_boundary_fields, | ||||||||||||||
start=declarative_cursor_attributes.start, | ||||||||||||||
end_provider=declarative_cursor_attributes.end_provider or connector_state_converter.get_end_provider(), | ||||||||||||||
lookback_window=declarative_cursor_attributes.lookback_window, | ||||||||||||||
slice_range=declarative_cursor_attributes.slice_range, | ||||||||||||||
cursor_granularity=declarative_cursor_attributes.cursor_granularity, | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
partition_generator = CursorPartitionGenerator( | ||||||||||||||
stream=declarative_stream, | ||||||||||||||
message_repository=self.message_repository, | ||||||||||||||
cursor=cursor, | ||||||||||||||
connector_state_converter=connector_state_converter, | ||||||||||||||
cursor_field=[declarative_cursor_attributes.cursor_field] | ||||||||||||||
if declarative_cursor_attributes.cursor_field is not None | ||||||||||||||
else None, | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
slice_boundary_fields=declarative_cursor_attributes.slice_boundary_fields, | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
concurrent_streams.append( | ||||||||||||||
DefaultStream( | ||||||||||||||
partition_generator=partition_generator, | ||||||||||||||
name=declarative_stream.name, | ||||||||||||||
json_schema=declarative_stream.get_json_schema(), | ||||||||||||||
availability_strategy=AlwaysAvailableAvailabilityStrategy(), | ||||||||||||||
primary_key=declarative_stream.primary_key, | ||||||||||||||
cursor_field=declarative_cursor_attributes.cursor_field.cursor_field_key, | ||||||||||||||
logger=self.logger, | ||||||||||||||
cursor=cursor, | ||||||||||||||
) | ||||||||||||||
) | ||||||||||||||
else: | ||||||||||||||
synchronous_streams.append(declarative_stream) | ||||||||||||||
else: | ||||||||||||||
synchronous_streams.append(declarative_stream) | ||||||||||||||
|
||||||||||||||
return concurrent_streams, synchronous_streams | ||||||||||||||
|
||||||||||||||
def _get_cursor_attributes( | ||||||||||||||
self, declarative_stream: DeclarativeStream, config: Mapping[str, Any] | ||||||||||||||
) -> Optional[DeclarativeCursorAttributes]: | ||||||||||||||
declarative_cursor = self._get_cursor(stream=declarative_stream) | ||||||||||||||
|
||||||||||||||
if isinstance(declarative_cursor, DatetimeBasedCursor) and type(declarative_cursor) is DatetimeBasedCursor: | ||||||||||||||
# Only incremental non-substreams are supported. Custom DatetimeBasedCursors are also not supported yet | ||||||||||||||
# because their behavior can deviate from ConcurrentBehavior | ||||||||||||||
|
||||||||||||||
slice_boundary_fields = ( | ||||||||||||||
declarative_cursor.get_partition_field_start().eval(config=config), | ||||||||||||||
declarative_cursor.get_partition_field_end().eval(config=config), | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
interpolated_state_date = declarative_cursor.get_start_datetime() | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good eyes! |
||||||||||||||
start_date = interpolated_state_date.get_datetime(config=config) | ||||||||||||||
|
||||||||||||||
interpolated_end_date = declarative_cursor.get_end_datetime() | ||||||||||||||
end_date_provider = partial(interpolated_end_date.get_datetime, config) if interpolated_end_date else None | ||||||||||||||
|
||||||||||||||
# DatetimeBasedCursor returns an isodate.Duration if step uses month or year precision. This still works in our | ||||||||||||||
# code, but mypy may complain when we actually implement this in the concurrent low-code source. To fix this, we | ||||||||||||||
# may need to convert a Duration to timedelta by multiplying month by 30 (but could lose precision). | ||||||||||||||
step_length = declarative_cursor.get_step() | ||||||||||||||
|
||||||||||||||
# The low-code DatetimeBasedCursor component uses the default max timedelta value which can lead to an | ||||||||||||||
# OverflowError when building datetime intervals. We should proactively cap this to the current moment instead | ||||||||||||||
if isinstance(step_length, datetime.timedelta) and step_length >= datetime.timedelta.max: | ||||||||||||||
step_length = datetime.datetime.now(tz=datetime.timezone.utc) - start_date | ||||||||||||||
|
||||||||||||||
return DeclarativeCursorAttributes( | ||||||||||||||
cursor_field=CursorField(declarative_cursor.cursor_field.eval(config=config)), | ||||||||||||||
datetime_format=declarative_cursor.datetime_format, | ||||||||||||||
slice_boundary_fields=slice_boundary_fields, | ||||||||||||||
start=start_date, | ||||||||||||||
end_provider=end_date_provider, | ||||||||||||||
slice_range=step_length, | ||||||||||||||
lookback_window=parse_duration(declarative_cursor.lookback_window) if declarative_cursor.lookback_window else None, | ||||||||||||||
cursor_granularity=parse_duration(declarative_cursor.cursor_granularity) if declarative_cursor.cursor_granularity else None, | ||||||||||||||
) | ||||||||||||||
return None | ||||||||||||||
|
||||||||||||||
@staticmethod | ||||||||||||||
def _get_cursor(stream: DeclarativeStream) -> Optional[DeclarativeCursor]: | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The concern I have with this method is that we are coupling the creation of the concurrent cursor with code that should eventually be removed i.e. we do (declarative -> low-code component -> concurrent) instead of (declarative -> concurrent). Once we will remove the low-code components, we will have to re-implement this logic. Should we instead rely on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is a fair question. I think this can get a little bit awkward because we need to sort of extract the DatetimeBasedCursor model from the |
||||||||||||||
""" | ||||||||||||||
Returns the low-code cursor component of a stream if it is concurrent compatible. Otherwise, returns None. | ||||||||||||||
""" | ||||||||||||||
if not stream.supports_incremental: | ||||||||||||||
return None | ||||||||||||||
|
||||||||||||||
if isinstance(stream.retriever, SimpleRetriever): | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah this is just an oversight on my part, i've just got rid of this whole method and just call self.get_cursor() from above instead |
||||||||||||||
return stream.retriever.cursor | ||||||||||||||
|
||||||||||||||
return None | ||||||||||||||
|
||||||||||||||
@staticmethod | ||||||||||||||
def _select_streams(streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog) -> List[AbstractStream]: | ||||||||||||||
stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} | ||||||||||||||
abstract_streams: List[AbstractStream] = [] | ||||||||||||||
for configured_stream in configured_catalog.streams: | ||||||||||||||
stream_instance = stream_name_to_instance.get(configured_stream.stream.name) | ||||||||||||||
if stream_instance: | ||||||||||||||
abstract_streams.append(stream_instance) | ||||||||||||||
|
||||||||||||||
return abstract_streams | ||||||||||||||
|
||||||||||||||
@staticmethod | ||||||||||||||
def _remove_concurrent_streams_from_catalog( | ||||||||||||||
catalog: ConfiguredAirbyteCatalog, | ||||||||||||||
concurrent_stream_names: set[str], | ||||||||||||||
) -> ConfiguredAirbyteCatalog: | ||||||||||||||
return ConfiguredAirbyteCatalog(streams=[stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names]) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,11 @@ | |
|
||
import logging | ||
from abc import abstractmethod | ||
from typing import Any, Mapping, Tuple | ||
from typing import Any, List, Mapping, Tuple | ||
|
||
from airbyte_cdk.sources.abstract_source import AbstractSource | ||
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker | ||
from airbyte_cdk.sources.streams import Stream | ||
|
||
|
||
class DeclarativeSource(AbstractSource): | ||
|
@@ -32,3 +33,6 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> | |
The error object will be cast to string to display the problem to the user. | ||
""" | ||
return self.connection_checker.check_connection(self, logger, config) | ||
|
||
def all_streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
return self.streams(config=config) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For non concurrent low-code sources, these are equivalent, but we override this implementation in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Teach me your ways, what is the difference betweeen streams and all_streams? (ignore if it's in this PR, reading through it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question, I mention it in a comment: https://github.com/airbytehq/airbyte/pull/46662/files#r1792816384
But the reason why we can't just rewrite the
streams()
method is because within the existing Python CDKcore.py
, when processing synchronous streams, we invoke thestreams()
method and in that context we don't want to return the concurrent streams that aren't compatible in that are of code.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed earlier, it’s preferable to use the stream method and condition the behavior accordingly. This approach adds some complexity, but it provides a tradeoff by allowing simpler modifications later. With this setup, when the core will be able to handle concurrent streams, we’ll get a stream generation interface for free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep this is addressed in my latest commit using the optional param
include_concurrent_streams