Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Low-Code] ConcurrentDeclarativeSource class that low-code connectors can inherit from to uptake Concurrent CDK #46662

Open
wants to merge 33 commits into
base: master
Choose a base branch
from

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Oct 9, 2024

Closes https://github.com/airbytehq/airbyte-internal-issues/issues/9710

What

Adds the new ConcurrentDeclarativeSource class which serves as the way we adapt the existing YamlDeclarativeSource used by all low-code connectors into being runnable within the Concurrent CDK framework. This PR combines all the other previous units of work so that low-code streams are translated into concurrent DefaultStream instances.

Another big aspect of this review is how we gate the streams that will run concurrently:

  • Only incremental non-substreams will run concurrently and we do so by inspecting for the DatetimeBasedCursor
  • We cannot migrate full refresh streams to concurrent because concurrent CDK does not support RFR so we would be regressing in behavior.
  • We cannot migrate substreams because concurrent CDK does not support substreams

How

The overall design is predicated on introducing a new class ConcurrentDeclarativeSource which behaves as a kind of adapter between the existing entrypoint.py that all syncs are triggered from and the ConcurrentSource which is responsible for running certain streams using the Concurrent CDK engine.

The ConcurrentDeclarativeSource inherits the ManifestDeclarativeSource so that we can reuse the logic to parse a manifest into low-code runtime components and allow the inspection of it's components to decide whether it can be run concurrently or synchronously.

The last big part of the code is this puts into place the logic to transform a low-code stream's DatetimeBasedCursor into a ConcurrentCursor. The reason why we need to do this is that the interfaces for a low-code cursor and concurrent cursor differ in a few specific ways and trying to make them both fit the same interface created a frankenstein class that proved to be even more unwieldly. In prior PRs, see 45413, it was determined that there was feature parity so now we perform the transformation and supply it to the concurrent engine to handle date window partitioning and state management.

Something else important to note is that there are some specific cases where an incremental stream cannot be run as a concurrent source. Since we introduced the language, we've allow stream_state to be a valid interpolation for various components. However, because partitions can be run in any order and complete at anytime, stream_state managed by the ConcurrentCursor is not a thread-safe value anymore (vs when it was managed sequentially). I inspected the schema and our repo for it's usage. For streams using stream_state in an unsafe way, we make it a synchronous stream, but we should fix those connectors to use the thread safe stream_interval and ultimately get rid of the extra code later.

Short term how we enable this

I've included two examples of how connectors can uptake concurrent processing. They are the same and will be deleted before merging.

The two things that need to be changed are:

  • run.py - Our previous design for connectors did not take any arguments passed to the connector from the platform. This is a significant limitation because the concurrent framework is entirely based around instantiating things like cursors up front before performing a read. I haven't found a great way to avoid changing this as this is a limitation of the Concurrent CDK
  • source.py - Once run.py is updated to pass in the various operation arguments like state, config, and catalog, we need to pass them to the ConcurrentDeclarativeSource constructor

Review guide

  1. concurrent_declarative_source.py
  2. datetime_based_cursor.py
  3. adapter.py
  4. datetime_stream_state_converter.py
  5. yaml_declarative_source.py
  6. See either source-sentry or source-amplitude

User Impact

This is considered a breaking CDK change because connectors will need to follow the included migration guide to update a connectors run.py and source.py files

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Yes, because this isn't release yet. However, this does pose a risk once we move a connector to concurrent because once we start emitting the new state format, then it is much harder to go backward since the connector cannot process concurrent state. See my comment in the code for more

Copy link

vercel bot commented Oct 9, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Oct 19, 2024 7:14am


connector_state_converter = CustomOutputFormatConcurrentStreamStateConverter(
datetime_format=declarative_cursor_attributes.datetime_format,
is_sequential_state=False,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By disabling is_sequential_state, we will automatically move connectors from their sequential state to per-partition concurrent state. This is the ideal end goal we want, however, if we were to revert the connector to a previous version that doesn't use the concurrent CDK, the state message would not be compatible.

It feels like to de-risk this a little bit on the early release, we should set this to true so we accept sequential state (i.e. {"updated_at": "2024-12-12"} and also emit it back to the platform as such

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we could add a mechanism to handle this situation by using the lowest value to set the sequential state if we revert the changes?

@@ -32,3 +33,6 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
The error object will be cast to string to display the problem to the user.
"""
return self.connection_checker.check_connection(self, logger, config)

def all_streams(self, config: Mapping[str, Any]) -> List[Stream]:
return self.streams(config=config)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non concurrent low-code sources, these are equivalent, but we override this implementation in ConcurrentDeclarativeSource to create a single list of both concurrent and synchronous sources so that we properly generate catalogs and other things

@brianjlai brianjlai requested review from natikgadzhi, lazebnyi and a team October 9, 2024 04:17
Comment on lines 182 to 184
cursor_field=[declarative_cursor_attributes.cursor_field]
if declarative_cursor_attributes.cursor_field is not None
else None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cursor_field=[declarative_cursor_attributes.cursor_field]
if declarative_cursor_attributes.cursor_field is not None
else None,
cursor_field=(
[declarative_cursor_attributes.cursor_field] if declarative_cursor_attributes.cursor_field is not None else None
),

catalog: ConfiguredAirbyteCatalog,
concurrent_stream_names: set[str],
) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog(streams=[stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return ConfiguredAirbyteCatalog(streams=[stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names])
catalog.streams = [stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names]
return catalog

Copy link

codeflash-ai bot commented Oct 9, 2024

⚡️ Codeflash found optimizations for this PR

📄 ConcurrentDeclarativeSource._remove_concurrent_streams_from_catalog() in airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py

📈 Performance improved by 29% (0.29x faster)

⏱️ Runtime went down from 5.43 microseconds to 4.20 microseconds

Explanation and details

To optimize the provided Python code for better performance, we will focus on streamlining operations, reducing overhead, and minimizing redundant calculations. However, given the complexity and dependencies involved in the original code, changes will be limited to areas that have clear potential for performance improvements without altering the function signatures and overall behavior. Below is the optimized version.

Optimizations.

  1. Avoid Redundant Checks: Consolidated the initialization of concurrency_level and initial_number_of_partitions_to_generate into a single block.
  2. max for Safe Calculation: Ensured that initial_number_of_partitions_to_generate is at least 1 using max function.
  3. Inline Stream Update: Updated catalog.streams in place within _remove_concurrent_streams_from_catalog method to reduce memory overhead.

These changes streamline the code execution flow and reduce unnecessary computations, potentially leading to better runtime performance while maintaining the original functionality.

Correctness verification

The new optimized code was tested for correctness. The results are listed below.

🔘 (none found) − ⚙️ Existing Unit Tests

✅ 2 Passed − 🌀 Generated Regression Tests

(click to show generated tests)
# imports
from typing import Any, List, Mapping, Optional

import pytest  # used for our unit tests
from airbyte_cdk.models import (ConfiguredAirbyteCatalog,
                                ConfiguredAirbyteStream)
from airbyte_cdk.sources.concurrent_source.concurrent_source import \
    ConcurrentSource
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.concurrent_declarative_source import \
    ConcurrentDeclarativeSource
from airbyte_cdk.sources.declarative.manifest_declarative_source import \
    ManifestDeclarativeSource
# function to test
from airbyte_cdk.sources.declarative.models.declarative_component_schema import \
    ConcurrencyLevel as ConcurrencyLevelModel
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import \
    ModelToComponentFactory
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
from airbyte_cdk.sources.source import TState

# unit tests

def create_catalog(stream_names: List[str]) -> ConfiguredAirbyteCatalog:
    """Helper function to create a ConfiguredAirbyteCatalog from a list of stream names."""
    streams = [ConfiguredAirbyteStream(stream={"name": name}) for name in stream_names]
    return ConfiguredAirbyteCatalog(streams=streams)
    # Outputs were verified to be equal to the original implementation








def test_empty_catalog():
    catalog = create_catalog([])
    concurrent_stream_names = set()
    codeflash_output = ConcurrentDeclarativeSource._remove_concurrent_streams_from_catalog(catalog, concurrent_stream_names)
    # Outputs were verified to be equal to the original implementation

def test_empty_catalog_with_non_empty_concurrent_stream_names():
    catalog = create_catalog([])
    concurrent_stream_names = {"stream1"}
    codeflash_output = ConcurrentDeclarativeSource._remove_concurrent_streams_from_catalog(catalog, concurrent_stream_names)
    # Outputs were verified to be equal to the original implementation

🔘 (none found) − ⏪ Replay Tests

Copy link
Contributor

@natikgadzhi natikgadzhi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got distracted, just posting this so comments don't get lost, will read thoroughly later

@@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
streams = source.all_streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Teach me your ways, what is the difference betweeen streams and all_streams? (ignore if it's in this PR, reading through it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, I mention it in a comment: https://github.com/airbytehq/airbyte/pull/46662/files#r1792816384

But the reason why we can't just rewrite the streams() method is because within the existing Python CDK core.py, when processing synchronous streams, we invoke the streams() method and in that context we don't want to return the concurrent streams that aren't compatible in that are of code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed earlier, it’s preferable to use the stream method and condition the behavior accordingly. This approach adds some complexity, but it provides a tradeoff by allowing simpler modifications later. With this setup, when the core will be able to handle concurrent streams, we’ll get a stream generation interface for free.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep this is addressed in my latest commit using the optional param include_concurrent_streams

@@ -67,6 +67,7 @@ def check_availability(self, logger: logging.Logger) -> StreamAvailability:
"""


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai probably better to call out that it should not be used at all, if we're ripping out availability strategies over mid-long term?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah i think i carried that over from a merge of serhii's work which deprecated availability strategy in concurrent. I'll update this to say do not use

Copy link
Collaborator

@lazebnyi lazebnyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the implementation looks great—nice work! However, we still need to make a few updates. After that I can approve.


# This needs to be revisited as we can't lose precision
if isinstance(obj, datetime):
return list(obj.timetuple())[0:6]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set 6 as a variable to avoid using magic numbers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes thank you for calling this out. I had put this in as a placeholder as I was working through getting this tested the first time around and this needs to be reinvestigated/fixed

@@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
streams = source.all_streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed earlier, it’s preferable to use the stream method and condition the behavior accordingly. This approach adds some complexity, but it provides a tradeoff by allowing simpler modifications later. With this setup, when the core will be able to handle concurrent streams, we’ll get a stream generation interface for free.

concurrency_level = concurrency_level_component.get_concurrency_level()
initial_number_of_partitions_to_generate = concurrency_level // 2
else:
concurrency_level = 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to move the value from the code into a class variable may be to improve readability

def all_streams(self, config: Mapping[str, Any]) -> List[Stream]:
return self._synchronous_streams + self._concurrent_streams # type: ignore # Although AbstractStream doesn't inherit stream, they were designed to fit the same interface when called from streams()

def _separate_streams(self, config: Mapping[str, Any]) -> Tuple[List[AbstractStream], List[Stream]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think the name _group_streams would be more informative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm fine with _group_streams

for slice_start, slice_end in self._cursor.generate_slices():
stream_slice = StreamSlice(partition={}, cursor_slice={"start": slice_start, "end": slice_end})

start_boundary = self._slice_boundary_fields[self._START_BOUNDARY] if self._slice_boundary_fields else "start"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For current implementation of datetime cursor self._slice_boundary_fields never has none value

from source_amplitude import SourceAmplitude


def _get_source(args: List[str]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to mention in PR description clearly that this is will be a breaking change. And add info about this to cdk-migration file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep that is the plan, I wrote up a migration guide that will be included in the next commit I push explaining what needs to change in run.py and source.py.

Copy link
Contributor

@pnilan pnilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple minor comments. I'll defer to Serhii for approval.

Also can you run regression tests w/ one/two of the test connectors?


state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

self.logger.info(f"what is config: {config}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a personal debugging log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is. i'm removing this thank you

declarative_cursor.get_partition_field_end().eval(config=config),
)

interpolated_state_date = declarative_cursor.get_start_datetime()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: interpolated_state_date typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good eyes!

end_boundary = self._slice_boundary_fields[self._END_BOUNDARY] if self._slice_boundary_fields else "end"

wam = list(self._cursor.generate_slices())
for slice_start, slice_end in wam:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It seems like it would be more memory efficient to directly iterate over the generated slices, is there a specific reason for saving to a list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. I had originally added this for debugging to see the entire set of slices easier, but you are correct this should be iterable


# This needs to be revisited as we can't lose precision
if isinstance(obj, datetime):
return list(obj.timetuple())[0:6]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this -- Is there somewhere we would be serializing a datetime object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll double check, but I think we needed this serializer deeper in our code, potentially in how we emit state back out. I'll reconfirm this as I work through @lazebnyi comment above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about this. From my understanding, this would mean that CursorPartitionGenerator would create slices with datetime within them but this is not what I see.

Copy link
Contributor Author

@brianjlai brianjlai Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lazebnyi @pnilan @maxi297 to close the loop on this one. I think what originally happened was i wrote this in when i was first testing because we were getting the datetime object from the ConcurrentCursor and it would fail trying to serialize it.

However, later after cleaning up the code and fixing edge cases, I addressed the serialization by converting the datetime into the correct output string format with the correct precision in the generate() function here in https://github.com/airbytehq/airbyte/pull/46662/files#diff-93127bface0b323fe43b21cdb8fb14493dd465995b085a4f81647f3697930bddR396-R399 . And since this was now already a string we don't need to convert it again

I'll get rid of this code as it's not actually used anymore and we're applying the correct precision based on the cursor definition.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a first pass of review. I'm very happy with the so many edge cases you've caught during this.

I have concerns but all of them are opened to discussion and I'm willing to tackle some of them myself if it can help.



@dataclass
class DeclarativeCursorAttributes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I have a concern that this only works for DatetimeBasedCursor. How do we envision the next steps for that class? Could we avoid this class by having _get_cursor(<...>) -> airbyte_cdk.sources.streams.concurrent.Cursor instead of _get_cursor_attributes? Or can we make this private?

@@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
streams = source.streams(config=config, include_concurrent_streams=True) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
Copy link
Contributor

@maxi297 maxi297 Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Regarding the # type: ignore that was already there before your changes: Should we just make it part of the interface then? I'm not sure why ConnectionChecker.check does not only take DeclarativeSource then. It seems like even the typing for Source is too large as streams is defined on the AbstractStream level. It feels like updating the typing issue, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing Python CDK. Streams that support concurrency are started from read().
"""
if include_concurrent_streams:
return self._synchronous_streams + self._concurrent_streams # type: ignore # Although AbstractStream doesn't inherit stream, they were designed to fit the same interface when called from streams()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is entirely true. For example, a caller of streams could expect to call read_only_records but this would not work in the case where AbstractStream is returned.

My understanding is that we need this because we still rely on the AbstractSource for check (all streams), discover (all streams) and read (only the non-concurrent streams).

Can we stop relying on streams at all? Can we rely on the previous Stream we already have to check availability and discovery until we do the switch to AbstractStream entirely? The read should also work with this because we filter the catalog in ConcurrentDeclarativeSource.read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposed fix: #46995


if isinstance(declarative_stream.retriever, SimpleRetriever) and isinstance(declarative_stream.retriever.requester, HttpRequester):
http_requester = declarative_stream.retriever.requester
if "stream_state" in http_requester._path.string:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice catch!

Maybe this is me being paranoid but should we have a log at warning in order for us to see this explicitly when we will have oncall issues? I would have one of each of the case we return False.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea. i'll add a log to each of these scenarios

@staticmethod
def _stream_supports_concurrent_partition_processing(declarative_stream: DeclarativeStream, cursor: DatetimeBasedCursor) -> bool:
"""
Many connectors make use of stream_state during interpolation on a per-partition basis under the assumption that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have documentation somewhere on things we need to do to fully migrate to the one CDK? If not, I can start one and this will be part of the list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have it documented at a high level as far as I am aware. However, in my writeup for the breaking change in cdk-migrations.md, I've listed all the connectors in the repo that use stream_state in a non-thread safe way and what we can do to fix them.

But at a higher level, we should have a line item for the unification of our CDKs. Basically we should update our schemas to not use stream_state as an interpolation context and remove it wherever it's used from the Retriever and below (although I don't think it's actually used above that)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take some time to improve previous work on documenting this here. Thanks for sharing this!

if not stream.supports_incremental:
return None

if isinstance(stream.retriever, SimpleRetriever):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not stream.get_cursor() here? I fear that this will not work for AsyncRetriever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is just an oversight on my part, i've just got rid of this whole method and just call self.get_cursor() from above instead

return True

@staticmethod
def _get_cursor(stream: DeclarativeStream) -> Optional[DeclarativeCursor]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern I have with this method is that we are coupling the creation of the concurrent cursor with code that should eventually be removed i.e. we do (declarative -> low-code component -> concurrent) instead of (declarative -> concurrent). Once we will remove the low-code components, we will have to re-implement this logic. Should we instead rely on airbyte_cdk.sources.declarative.models.declarative_component_schema.DatetimeBasedCursor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a fair question. I think this can get a little bit awkward because we need to sort of extract the DatetimeBasedCursor model from the _source_config field and we lose the automatic parsing into interpolated fields and other things that we basically need to reimplement. But it does feel like the model is the more solid interface that we won't change. I will play around with this tomorrow and see if this can be done sanely


# This needs to be revisited as we can't lose precision
if isinstance(obj, datetime):
return list(obj.timetuple())[0:6]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about this. From my understanding, this would mean that CursorPartitionGenerator would create slices with datetime within them but this is not what I see.

@@ -1,5 +1,94 @@
# CDK Migration Guide

## Upgrading to 6.x.x
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we validate with @bnchrch what is the impact on the manifest-only sources and what is the migration path for them?

Copy link
Contributor Author

@brianjlai brianjlai Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, i'll add a note in the guide. But it sounds like source-declarative-manifest is pinned to an explicit version so we can migrate on our own time. But we basically need to modify the respective run.py and source.py files the way we do individual connectors and then bump the version. And that in turn is actually the easiest way to release this to the most connectors since no manifest-only connectors define their own run.py

and as for connector builder, that one actually doesn't use YamlDeclarativeSource it uses SourceDeclarativeManifest which doesn't contain breaking changes. I think it's up for debate whether that should switch over, but for now it would be non breaking for there

@maxi297 maxi297 mentioned this pull request Oct 18, 2024
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants