Skip to content

Commit

Permalink
ref(source-facebook-marketing): raise exception on missing stream (#4…
Browse files Browse the repository at this point in the history
…6546)

Signed-off-by: Artem Inzhyyants <[email protected]>
  • Loading branch information
artem1205 authored Oct 18, 2024
1 parent a4847df commit f07571f
Show file tree
Hide file tree
Showing 18 changed files with 1,323 additions and 785 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 3.3.15
dockerImageTag: 3.3.16
dockerRepository: airbyte/source-facebook-marketing
documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing
githubIssueLabel: source-facebook-marketing
Expand Down
1,883 changes: 1,209 additions & 674 deletions airbyte-integrations/connectors/source-facebook-marketing/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "3.3.15"
version = "3.3.16"
name = "source-facebook-marketing"
description = "Source implementation for Facebook Marketing."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -16,8 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_facebook_marketing"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "^3.5.0"
python = "^3.10,<3.12"
airbyte-cdk = "^5"
facebook-business = "19.0.0"
cached-property = "==1.5.2"

Expand All @@ -27,5 +27,5 @@ source-facebook-marketing = "source_facebook_marketing.run:run"
[tool.poetry.group.dev.dependencies]
pytest-mock = "^3.6"
freezegun = "^1.4.0"
pytest = "^6.1"
pytest = "^7"
requests-mock = "^1.9.3"
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from typing import Any, List, Mapping

from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk import emit_configuration_as_airbyte_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
Expand Down Expand Up @@ -56,14 +56,6 @@ def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str,
# return modified config
return migrated_config

@classmethod
def emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository._message_queue:
print(message.json(exclude_unset=True))

@classmethod
def migrate(cls, args: List[str], source: Source) -> None:
"""
Expand All @@ -78,9 +70,7 @@ def migrate(cls, args: List[str], source: Source) -> None:
config = source.read_config(config_path)
# migration check
if cls.should_migrate(config):
cls.emit_control_message(
cls.modify_and_save(config_path, source, config),
)
emit_configuration_as_airbyte_control_message(cls.modify_and_save(config_path, source, config))


class MigrateIncludeDeletedToStatusFilters(MigrateAccountIdToArray):
Expand Down Expand Up @@ -156,9 +146,7 @@ def migrate(cls, args: List[str], source: Source) -> None:
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(
cls._modify_and_save(config_path, source, config),
)
emit_configuration_as_airbyte_control_message(cls._modify_and_save(config_path, source, config))

@classmethod
def _transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
Expand Down Expand Up @@ -186,8 +174,3 @@ def _modify_and_save(cls, config_path: str, source: Source, config: Mapping[str,
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config

@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
print(create_connector_config_control_message(migrated_config).json(exclude_unset=True))
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@

class SourceFacebookMarketing(AbstractSource):
# Skip exceptions on missing streams
raise_exception_on_missing_stream = False
raise_exception_on_missing_stream = True

def _validate_and_transform(self, config: Mapping[str, Any]):
config.setdefault("action_breakdowns_allow_empty", False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import logging
from functools import cache
from functools import cache, cached_property
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union

import airbyte_cdk.sources.utils.casing as casing
Expand Down Expand Up @@ -102,7 +102,7 @@ def __init__(
self._next_cursor_values = self._get_start_date()
self._completed_slices = {account_id: set() for account_id in self._account_ids}

@property
@cached_property
def name(self) -> str:
"""We override stream name to let the user change it via configuration."""
name = self._new_class_name or self.__class__.__name__
Expand Down Expand Up @@ -189,13 +189,13 @@ def state(self) -> MutableMapping[str, Any]:
if account_id in self._cursor_values and self._cursor_values[account_id]:
new_state[account_id] = {self.cursor_field: self._cursor_values[account_id].isoformat()}

new_state[account_id]["slices"] = {d.isoformat() for d in self._completed_slices[account_id]}
new_state[account_id]["slices"] = sorted(list({d.isoformat() for d in self._completed_slices[account_id]}))
new_state["time_increment"] = self.time_increment
return new_state

if self._completed_slices:
for account_id in self._account_ids:
new_state[account_id]["slices"] = {d.isoformat() for d in self._completed_slices[account_id]}
new_state[account_id]["slices"] = sorted(list({d.isoformat() for d in self._completed_slices[account_id]}))

new_state["time_increment"] = self.time_increment
return new_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from source_facebook_marketing.api import API

logger = logging.getLogger("airbyte")
from airbyte_cdk.sources.streams import CheckpointMixin


class FBMarketingStream(Stream, ABC):
Expand Down Expand Up @@ -240,7 +241,7 @@ def _filter_all_statuses(self) -> MutableMapping[str, Any]:
)


class FBMarketingIncrementalStream(FBMarketingStream, ABC):
class FBMarketingIncrementalStream(FBMarketingStream, CheckpointMixin, ABC):
"""Base class for incremental streams"""

cursor_field = "updated_time"
Expand All @@ -249,8 +250,17 @@ def __init__(self, start_date: Optional[datetime], end_date: Optional[datetime],
super().__init__(**kwargs)
self._start_date = pendulum.instance(start_date) if start_date else None
self._end_date = pendulum.instance(end_date) if end_date else None
self._state = {}

def get_updated_state(
@property
def state(self):
return self._state

@state.setter
def state(self, value: Mapping[str, Any]):
self._state.update(**value)

def _get_updated_state(
self,
current_stream_state: MutableMapping[str, Any],
latest_record: Mapping[str, Any],
Expand Down Expand Up @@ -313,6 +323,17 @@ def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
],
}

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
self.state = self._get_updated_state(self.state, record)
yield record


class FBMarketingReversedIncrementalStream(FBMarketingIncrementalStream, ABC):
"""The base class for streams that don't support filtering and return records sorted desc by cursor_value"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import freezegun
import pendulum
from airbyte_cdk.models import AirbyteStateMessage, AirbyteStreamStateSerializer, StreamDescriptor, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.mock_http.response_builder import (
Expand All @@ -22,7 +23,6 @@
create_response_builder,
find_template,
)
from airbyte_protocol.models import AirbyteStateMessage, StreamDescriptor, SyncMode
from source_facebook_marketing.streams.async_job import Status

from .config import ACCESS_TOKEN, ACCOUNT_ID, DATE_FORMAT, END_DATE, NOW, START_DATE, ConfigBuilder
Expand Down Expand Up @@ -467,7 +467,7 @@ def test_when_read_then_state_message_produced_and_state_match_start_interval(se
)

output = self._read(config().with_account_ids([account_id]).with_start_date(start_date).with_end_date(end_date))
cursor_value_from_state_message = output.most_recent_state.stream_state.dict().get(account_id, {}).get(_CURSOR_FIELD)
cursor_value_from_state_message = AirbyteStreamStateSerializer.dump(output.most_recent_state).get("stream_state").get(account_id, {}).get(_CURSOR_FIELD)
assert output.most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert cursor_value_from_state_message == start_date.strftime(DATE_FORMAT)

Expand Down Expand Up @@ -511,8 +511,8 @@ def test_given_multiple_account_ids_when_read_then_state_produced_by_account_id_
)

output = self._read(config().with_account_ids([account_id_1, account_id_2]).with_start_date(start_date).with_end_date(end_date))
cursor_value_from_state_account_1 = output.most_recent_state.stream_state.dict().get(account_id_1, {}).get(_CURSOR_FIELD)
cursor_value_from_state_account_2 = output.most_recent_state.stream_state.dict().get(account_id_2, {}).get(_CURSOR_FIELD)
cursor_value_from_state_account_1 = AirbyteStreamStateSerializer.dump(output.most_recent_state).get("stream_state").get(account_id_1, {}).get(_CURSOR_FIELD)
cursor_value_from_state_account_2 = AirbyteStreamStateSerializer.dump(output.most_recent_state).get("stream_state").get(account_id_2, {}).get(_CURSOR_FIELD)
expected_cursor_value = start_date.strftime(DATE_FORMAT)
assert output.most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert cursor_value_from_state_account_1 == expected_cursor_value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from unittest import TestCase

from airbyte_cdk.models import AirbyteStreamStateSerializer, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.mock_http.response_builder import (
Expand All @@ -12,7 +13,6 @@
create_response_builder,
find_template,
)
from airbyte_protocol.models import SyncMode

from .config import ACCOUNT_ID, ConfigBuilder
from .request_builder import get_account_request, get_ad_sets_request, get_ads_request, get_campaigns_request
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_ads_stream(self, http_mocker: HttpMocker):

output = self._read(config().with_ad_statuses(self.statuses), "ads")
assert len(output.records) == 1
account_state = output.most_recent_state.dict()["stream_state"][self.account_id]
account_state = AirbyteStreamStateSerializer.dump(output.most_recent_state).get("stream_state")[self.account_id]
assert self.filter_statuses_flag in account_state, f"State should include `filter_statuses` flag to track new records in the past."
assert account_state == {"filter_statuses": self.statuses, "updated_time": "2023-03-21T22:41:46-0700"}

Expand Down Expand Up @@ -140,7 +140,7 @@ def test_campaigns_stream(self, http_mocker: HttpMocker):
output = self._read(config().with_campaign_statuses(self.statuses), "campaigns")
assert len(output.records) == 1

account_state = output.most_recent_state.dict()["stream_state"][self.account_id]
account_state = AirbyteStreamStateSerializer.dump(output.most_recent_state).get("stream_state")[self.account_id]
assert self.filter_statuses_flag in account_state, f"State should include `filter_statuses` flag to track new records in the past."
assert account_state == {"filter_statuses": self.statuses, "updated_time": "2024-03-12T15:02:47-0700"}

Expand Down Expand Up @@ -184,6 +184,6 @@ def test_ad_sets_stream(self, http_mocker: HttpMocker):
output = self._read(config().with_ad_set_statuses(self.statuses), "ad_sets")
assert len(output.records) == 1

account_state = output.most_recent_state.dict()["stream_state"][self.account_id]
account_state = AirbyteStreamStateSerializer.dump(output.most_recent_state).get("stream_state")[self.account_id]
assert self.filter_statuses_flag in account_state, f"State should include `filter_statuses` flag to track new records in the past."
assert account_state == {"filter_statuses": self.statuses, "updated_time": "2024-03-02T15:02:47-0700"}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest import TestCase

import freezegun
from airbyte_cdk.models import AirbyteStateMessage, AirbyteStreamStateSerializer, SyncMode
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.mock_http.response_builder import (
Expand All @@ -18,7 +19,6 @@
find_template,
)
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_protocol.models import AirbyteStateMessage, SyncMode

from .config import ACCESS_TOKEN, ACCOUNT_ID, NOW, ConfigBuilder
from .pagination import NEXT_PAGE_TOKEN, FacebookMarketingPaginationStrategy
Expand Down Expand Up @@ -244,7 +244,7 @@ def test_when_read_then_state_message_produced_and_state_match_latest_record(sel
)

output = self._read(config().with_account_ids([account_id]))
cursor_value_from_state_message = output.most_recent_state.stream_state.dict().get(account_id, {}).get(_CURSOR_FIELD)
cursor_value_from_state_message = AirbyteStreamStateSerializer.dump(output.most_recent_state).get("stream_state").get(account_id, {}).get(_CURSOR_FIELD)
assert cursor_value_from_state_message == max_cursor_value

@HttpMocker()
Expand Down Expand Up @@ -276,8 +276,8 @@ def test_given_multiple_account_ids_when_read_then_state_produced_by_account_id_
)

output = self._read(config().with_account_ids([account_id_1, account_id_2]))
cursor_value_from_state_account_1 = output.most_recent_state.stream_state.dict().get(account_id_1, {}).get(_CURSOR_FIELD)
cursor_value_from_state_account_2 = output.most_recent_state.stream_state.dict().get(account_id_2, {}).get(_CURSOR_FIELD)
cursor_value_from_state_account_1 = AirbyteStreamStateSerializer.dump(output.most_recent_state).get("stream_state").get(account_id_1, {}).get(_CURSOR_FIELD)
cursor_value_from_state_account_2 = AirbyteStreamStateSerializer.dump(output.most_recent_state).get("stream_state").get(account_id_2, {}).get(_CURSOR_FIELD)
assert cursor_value_from_state_account_1 == max_cursor_value_account_id_1
assert cursor_value_from_state_account_2 == max_cursor_value_account_id_2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from typing import Any, Dict, List, Optional
from urllib.parse import urlencode

from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.test.catalog_builder import ConfiguredAirbyteStreamBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_protocol.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, SyncMode
from facebook_business.api import _top_level_param_json_encode
from source_facebook_marketing import SourceFacebookMarketing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ def test_read_records_add_account_id(self, mocker, api, some_config):
{
"unknown_account": {
AdsInsights.cursor_field: "2010-10-03",
"slices": {
"slices": [
"2010-01-01",
"2010-01-02",
},
],
},
"time_increment": 1,
},
Expand All @@ -244,10 +244,10 @@ def test_read_records_add_account_id(self, mocker, api, some_config):
},
{
"unknown_account": {
"slices": {
"slices": [
"2010-01-01",
"2010-01-02",
}
]
}
},
),
Expand All @@ -256,10 +256,10 @@ def test_read_records_add_account_id(self, mocker, api, some_config):
{
"unknown_account": {
AdsInsights.cursor_field: "2010-10-03",
"slices": {
"slices": [
"2010-01-01",
"2010-01-02",
},
],
},
"time_increment": 1,
},
Expand All @@ -276,10 +276,10 @@ def test_read_records_add_account_id(self, mocker, api, some_config):
(
{
"unknown_account": {
"slices": {
"slices": [
"2010-01-01",
"2010-01-02",
}
]
}
},
None,
Expand All @@ -298,14 +298,14 @@ def test_state(self, api, state, result_state, some_config):

assert stream.state == {
"time_increment": 1,
"unknown_account": {"slices": set()},
"unknown_account": {"slices": []},
}

stream.state = state
actual_state = stream.state

result_state = state if not result_state else result_state
result_state[some_config["account_ids"][0]]["slices"] = result_state[some_config["account_ids"][0]].get("slices", set())
result_state[some_config["account_ids"][0]]["slices"] = result_state[some_config["account_ids"][0]].get("slices", [])
result_state["time_increment"] = 1

assert actual_state == result_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,5 +258,5 @@ def test_get_updated_state(
# Set the instance's filter_statuses
incremental_class_instance._filter_statuses = instance_filter_statuses

new_state = incremental_class_instance.get_updated_state(current_stream_state, latest_record)
new_state = incremental_class_instance._get_updated_state(current_stream_state, latest_record)
assert new_state == expected_state
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

import pendulum
import pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import FailureType, SyncMode
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_protocol.models import FailureType
from facebook_business import FacebookAdsApi, FacebookSession
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.streams import Activities, AdAccount, AdCreatives, Campaigns, Videos
Expand Down
Loading

0 comments on commit f07571f

Please sign in to comment.