Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Low-Code] ConcurrentDeclarativeSource class that low-code connectors can inherit from to uptake Concurrent CDK #46662

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1130a74
initial work to create the ConcurrentDeclarativeSource that can run i…
brianjlai Sep 23, 2024
54a1f85
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Sep 26, 2024
0bd2deb
adding more tests and fixing bugs for only syncing streams in catalog…
brianjlai Oct 1, 2024
2ad65d9
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 2, 2024
8861054
fix a few more merge conflict errors
brianjlai Oct 2, 2024
aa55ab7
Fix tests and add cursor granularity to the cursor partition generator
brianjlai Oct 3, 2024
4288b8d
integrate YamlDeclarativeSource with ConcurrentDeclarativeSource and …
brianjlai Oct 9, 2024
1650e30
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 9, 2024
98c42a7
rebase, formatting, fix tests, add new test cases for concurrency level
brianjlai Oct 9, 2024
0f79069
forgot to remove change to test
brianjlai Oct 9, 2024
5992e19
fix mypy errors and a few others bugs and testing
brianjlai Oct 12, 2024
6a160c0
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 12, 2024
b09311f
add logic to skip streams using non-thread safe stream_state, pr feed…
brianjlai Oct 16, 2024
0180e11
fix formatting and mypy checks
brianjlai Oct 16, 2024
0c0c019
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 16, 2024
61dfb9b
fix more mypy
brianjlai Oct 16, 2024
5dd1121
mypy
brianjlai Oct 16, 2024
7bd3056
pr feedback and updates to source-sentry for testing
brianjlai Oct 17, 2024
bef1c03
sentry lockfile
brianjlai Oct 17, 2024
c2e3bdb
update base image
brianjlai Oct 17, 2024
9116da6
bump amplitude dependencies and versions for testing
brianjlai Oct 17, 2024
80186ca
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 17, 2024
1242296
add logging for incremental streams that are not thread safe
brianjlai Oct 17, 2024
b74b942
Merge branch 'master' into brian/concurrent_declarative_source
brianjlai Oct 17, 2024
99bee1e
remove amplitude version bump
brianjlai Oct 17, 2024
15cb6dd
get rid of stream_state interpolation in sentry
brianjlai Oct 18, 2024
9b5eb62
whatever
brianjlai Oct 19, 2024
b82c21c
parse DatetimeBasedCursorModel to ConcurrentCursor, bugfixes, pr feed…
brianjlai Oct 19, 2024
6a91848
formatting + mypy
brianjlai Oct 19, 2024
15127a7
fix mypy by replacing empty tuple() with None to make it truly optional
brianjlai Oct 19, 2024
977c525
remove local cdk from sentry
brianjlai Oct 19, 2024
4e38c4e
update lockfile
brianjlai Oct 19, 2024
f7f3e9d
swapped updating lockfiles
brianjlai Oct 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
streams = source.all_streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Teach me your ways, what is the difference betweeen streams and all_streams? (ignore if it's in this PR, reading through it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, I mention it in a comment: https://github.com/airbytehq/airbyte/pull/46662/files#r1792816384

But the reason why we can't just rewrite the streams() method is because within the existing Python CDK core.py, when processing synchronous streams, we invoke the streams() method and in that context we don't want to return the concurrent streams that aren't compatible in that are of code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed earlier, it’s preferable to use the stream method and condition the behavior accordingly. This approach adds some complexity, but it provides a tradeoff by allowing simpler modifications later. With this setup, when the core will be able to handle concurrent streams, we’ll get a stream generation interface for free.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep this is addressed in my latest commit using the optional param include_concurrent_streams

stream_name_to_stream = {s.name: s for s in streams}
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

import logging
from abc import abstractmethod
from typing import Any, Mapping, Tuple
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.streams import Stream


class DeclarativeSource(AbstractSource):
Expand All @@ -32,3 +33,6 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
The error object will be cast to string to display the problem to the user.
"""
return self.connection_checker.check_connection(self, logger, config)

def all_streams(self, config: Mapping[str, Any]) -> List[Stream]:
return self.streams(config=config)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non concurrent low-code sources, these are equivalent, but we override this implementation in ConcurrentDeclarativeSource to create a single list of both concurrent and synchronous sources so that we properly generate catalogs and other things

Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,18 @@ def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
# Check if the new runtime lookback window is greater than the current config lookback
if parse_duration(runtime_lookback_window) > config_lookback:
self._lookback_window = InterpolatedString.create(runtime_lookback_window, parameters={})

def get_start_datetime(self) -> MinMaxDatetime:
return self._start_datetime

def get_end_datetime(self) -> Optional[MinMaxDatetime]:
return self._end_datetime

def get_step(self) -> Union[timedelta, Duration]:
return self._step

def get_partition_field_start(self) -> InterpolatedString:
return self._partition_field_start

def get_partition_field_end(self) -> InterpolatedString:
return self._partition_field_end
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,37 @@
#

import pkgutil
from typing import Any
from typing import Any, List, Mapping, Optional

import yaml
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource
from airbyte_cdk.sources.types import ConnectionDefinition


class YamlDeclarativeSource(ManifestDeclarativeSource):
class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
"""Declarative source defined by a yaml file"""

def __init__(self, path_to_yaml: str, debug: bool = False) -> None:
def __init__(
self,
path_to_yaml: str,
debug: bool = False,
catalog: Optional[ConfiguredAirbyteCatalog] = None,
config: Optional[Mapping[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
) -> None:
"""
:param path_to_yaml: Path to the yaml file describing the source
"""
self._path_to_yaml = path_to_yaml
source_config = self._read_and_parse_yaml_file(path_to_yaml)
super().__init__(source_config, debug)

super().__init__(
catalog=catalog or ConfiguredAirbyteCatalog(streams=[]),
config=config or {},
state=state or [],
source_config=source_config,
)

def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
package = self.__class__.__module__.split(".")[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import copy
import json
import logging
from datetime import datetime
from functools import lru_cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

Expand All @@ -24,6 +25,7 @@
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import DateTimeStreamStateConverter
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.types import StreamSlice
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
Expand Down Expand Up @@ -203,6 +205,11 @@ class SliceEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
if hasattr(obj, "__json_serializable__"):
return obj.__json_serializable__()

# This needs to be revisited as we can't lose precision
if isinstance(obj, datetime):
return list(obj.timetuple())[0:6]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set 6 as a variable to avoid using magic numbers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes thank you for calling this out. I had put this in as a placeholder as I was working through getting this tested the first time around and this needs to be reinvestigated/fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this -- Is there somewhere we would be serializing a datetime object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll double check, but I think we needed this serializer deeper in our code, potentially in how we emit state back out. I'll reconfirm this as I work through @lazebnyi comment above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about this. From my understanding, this would mean that CursorPartitionGenerator would create slices with datetime within them but this is not what I see.

Copy link
Contributor Author

@brianjlai brianjlai Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lazebnyi @pnilan @maxi297 to close the loop on this one. I think what originally happened was i wrote this in when i was first testing because we were getting the datetime object from the ConcurrentCursor and it would fail trying to serialize it.

However, later after cleaning up the code and fixing edge cases, I addressed the serialization by converting the datetime into the correct output string format with the correct precision in the generate() function here in https://github.com/airbytehq/airbyte/pull/46662/files#diff-93127bface0b323fe43b21cdb8fb14493dd465995b085a4f81647f3697930bddR396-R399 . And since this was now already a string we don't need to convert it again

I'll get rid of this code as it's not actually used anymore and we're applying the correct precision based on the cursor definition.


# Let the base class default method raise the TypeError
return super().default(obj)

Expand Down Expand Up @@ -341,12 +348,17 @@ class CursorPartitionGenerator(PartitionGenerator):
across partitions. Each partition represents a subset of the stream's data and is determined by the cursor's state.
"""

_START_BOUNDARY = 0
_END_BOUNDARY = 1

def __init__(
self,
stream: Stream,
message_repository: MessageRepository,
cursor: Cursor,
connector_state_converter: DateTimeStreamStateConverter,
cursor_field: Optional[List[str]],
slice_boundary_fields: Optional[Tuple[str, str]],
):
"""
Initialize the CursorPartitionGenerator with a stream, sync mode, and cursor.
Expand All @@ -362,6 +374,8 @@ def __init__(
self._cursor = cursor
self._cursor_field = cursor_field
self._state = self._cursor.state
self._slice_boundary_fields = slice_boundary_fields
self._connector_state_converter = connector_state_converter

def generate(self) -> Iterable[Partition]:
"""
Expand All @@ -372,8 +386,19 @@ def generate(self) -> Iterable[Partition]:

:return: An iterable of StreamPartition objects.
"""
for slice_start, slice_end in self._cursor.generate_slices():
stream_slice = StreamSlice(partition={}, cursor_slice={"start": slice_start, "end": slice_end})

start_boundary = self._slice_boundary_fields[self._START_BOUNDARY] if self._slice_boundary_fields else "start"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For current implementation of datetime cursor self._slice_boundary_fields never has none value

end_boundary = self._slice_boundary_fields[self._END_BOUNDARY] if self._slice_boundary_fields else "end"

wam = list(self._cursor.generate_slices())
for slice_start, slice_end in wam:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It seems like it would be more memory efficient to directly iterate over the generated slices, is there a specific reason for saving to a list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. I had originally added this for debugging to see the entire set of slices easier, but you are correct this should be iterable

stream_slice = StreamSlice(
partition={},
cursor_slice={
start_boundary: self._connector_state_converter.output_format(slice_start),
end_boundary: self._connector_state_converter.output_format(slice_end),
},
)

yield StreamPartition(
self._stream,
Expand All @@ -386,7 +411,7 @@ def generate(self) -> Iterable[Partition]:
)


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@deprecated("Availability strategy has been soft deprecated. Do not use. Class is subject to removal", category=ExperimentalClassWarning)
class AvailabilityStrategyFacade(AvailabilityStrategy):
def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
self._abstract_availability_strategy = abstract_availability_strategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def check_availability(self, logger: logging.Logger) -> StreamAvailability:
"""


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai probably better to call out that it should not be used at all, if we're ripping out availability strategies over mid-long term?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah i think i carried that over from a merge of serhii's work which deprecated availability strategy in concurrent. I'll update this to say do not use

class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy):
"""
An availability strategy that always indicates a stream is available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,10 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType)

lower = max(lower, self._start) if self._start else lower
if not self._slice_range or lower + self._slice_range >= upper:
yield lower, upper
if self._cursor_granularity:
yield lower, upper - self._cursor_granularity
else:
yield lower, upper
else:
stop_processing = False
current_lower_boundary = lower
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,17 @@ def parse_timestamp(self, timestamp: str) -> datetime:
if not isinstance(dt_object, DateTime):
raise ValueError(f"DateTime object was expected but got {type(dt_object)} from pendulum.parse({timestamp})")
return dt_object # type: ignore # we are manually type checking because pendulum.parse may return different types


class CustomOutputFormatConcurrentStreamStateConverter(IsoMillisConcurrentStreamStateConverter):
"""
Datetime State converter that emits state according to the supplied datetime format. The converter supports reading
incoming state in any valid datetime format via Pendulum.
"""

def __init__(self, datetime_format: str, is_sequential_state: bool = True, cursor_granularity: Optional[timedelta] = None):
super().__init__(is_sequential_state=is_sequential_state, cursor_granularity=cursor_granularity)
self._datetime_format = datetime_format

def output_format(self, timestamp: datetime) -> str:
return timestamp.strftime(self._datetime_format)
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_check_stream_with_slices_as_list(test_name, record, streams_to_check, s
stream.read_records.side_effect = mock_read_records({frozenset(stream_slice): iter([record])})

source = MagicMock()
source.streams.return_value = [stream]
source.all_streams.return_value = [stream]

check_stream = CheckStream(streams_to_check, parameters={})

Expand All @@ -63,7 +63,7 @@ def test_check_empty_stream():
stream.stream_slices.return_value = iter([None])

source = MagicMock()
source.streams.return_value = [stream]
source.all_streams.return_value = [stream]

check_stream = CheckStream(["s1"], parameters={})
stream_is_available, reason = check_stream.check_connection(source, logger, config)
Expand All @@ -76,7 +76,7 @@ def test_check_stream_with_no_stream_slices_aborts():
stream.stream_slices.return_value = iter([])

source = MagicMock()
source.streams.return_value = [stream]
source.all_streams.return_value = [stream]

check_stream = CheckStream(["s1"], parameters={})
stream_is_available, reason = check_stream.check_connection(source, logger, config)
Expand Down Expand Up @@ -123,7 +123,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
assert isinstance(http_stream, HttpStream)

source = MagicMock()
source.streams.return_value = [http_stream]
source.all_streams.return_value = [http_stream]

check_stream = CheckStream(stream_names=["mock_http_stream"], parameters={})

Expand Down
Loading
Loading