Skip to content

Commit

Permalink
[Core] ocean core next resync (port-labs#835)
Browse files Browse the repository at this point in the history
# Description

**What:**
- Integrated functionality to send the state of the integration on each
sync.
- Implemented prediction of the next sync date.

**Why:**
- To ensure that the integration state is consistently updated and
monitored.
- To enhance the accuracy and reliability of the next sync date
prediction by utilizing server data for SaaS applications, rather than
relying solely on environment variables.

**How:**
- Updated the sync logic to include the current integration state in the
payload sent to our monitoring system.
- Modified the sync prediction mechanism for SaaS applications to use
data from our servers, providing more accurate and context-aware
predictions.

## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

---------

Co-authored-by: Shalev Avhar <[email protected]>
Co-authored-by: Tom Tankilevitch <[email protected]>
  • Loading branch information
3 people authored Aug 21, 2024
1 parent f96ea28 commit d54b97b
Show file tree
Hide file tree
Showing 16 changed files with 360 additions and 29 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.10.0 (2024-08-19)

### Improvements

- Add support for reporting the integration resync state to expose more information about the integration state in the portal


## 0.9.14 (2024-08-19)


Expand Down
17 changes: 17 additions & 0 deletions port_ocean/clients/port/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
get_internal_http_client,
)
from port_ocean.exceptions.clients import KafkaCredentialsNotFound
from typing import Any


class PortClient(
Expand Down Expand Up @@ -75,3 +76,19 @@ async def get_org_id(self) -> str:
handle_status_code(response)

return response.json()["organization"]["id"]

async def update_integration_state(
self, state: dict[str, Any], should_raise: bool = True, should_log: bool = True
) -> dict[str, Any]:
if should_log:
logger.debug(f"Updating integration resync state with: {state}")
response = await self.client.patch(
f"{self.api_url}/integration/{self.integration_identifier}/resync-state",
headers=await self.auth.headers(),
json=state,
)
handle_status_code(response, should_raise, should_log)
if response.is_success and should_log:
logger.info("Integration resync state updated successfully")

return response.json().get("integration", {})
6 changes: 3 additions & 3 deletions port_ocean/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
integration: IntegrationSettings = Field(
default_factory=lambda: IntegrationSettings(type="", identifier="")
)
runtime: Runtime = "OnPrem"
runtime: Runtime = Runtime.OnPrem

@root_validator()
def validate_integration_config(cls, values: dict[str, Any]) -> dict[str, Any]:
Expand All @@ -100,8 +100,8 @@ def parse_config(model: Type[BaseModel], config: Any) -> BaseModel:
return values

@validator("runtime")
def validate_runtime(cls, runtime: Literal["OnPrem", "Saas"]) -> Runtime:
if runtime == "Saas":
def validate_runtime(cls, runtime: Runtime) -> Runtime:
if runtime == Runtime.Saas:
spec = get_spec_file()
if spec is None:
raise ValueError(
Expand Down
37 changes: 37 additions & 0 deletions port_ocean/core/event_listener/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from port_ocean.config.base import BaseOceanModel
from port_ocean.utils.signal import signal_handler
from port_ocean.context.ocean import ocean
from port_ocean.utils.misc import IntegrationStateStatus


class EventListenerEvents(TypedDict):
Expand Down Expand Up @@ -36,6 +38,41 @@ def _stop(self) -> None:
"""
pass

async def _before_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action before resync.
"""
await ocean.app.resync_state_updater.update_before_resync()

async def _after_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action after resync.
"""
await ocean.app.resync_state_updater.update_after_resync()

async def _on_resync_failure(self, e: Exception) -> None:
"""
Can be used for event listeners that need to handle resync failures.
"""
await ocean.app.resync_state_updater.update_after_resync(
IntegrationStateStatus.Failed
)

async def _resync(
self,
resync_args: dict[Any, Any],
) -> None:
"""
Triggers the "on_resync" event.
"""
await self._before_resync()
try:
await self.events["on_resync"](resync_args)
await self._after_resync()
except Exception as e:
await self._on_resync_failure(e)
raise e


class EventListenerSettings(BaseOceanModel, extra=Extra.allow):
type: str
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/event_listener/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ async def _start(self) -> None:

@target_channel_router.post("/resync")
async def resync() -> None:
await self.events["on_resync"]({})
await self._resync({})

ocean.app.fast_api_app.include_router(target_channel_router)
2 changes: 1 addition & 1 deletion port_ocean/core/event_listener/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def _handle_message(self, raw_msg: Message) -> None:

if "change.log" in topic and message is not None:
try:
await self.events["on_resync"](message)
await self._resync(message)
except Exception as e:
_type, _, tb = sys.exc_info()
logger.opt(exception=(_type, None, tb)).error(
Expand Down
97 changes: 96 additions & 1 deletion port_ocean/core/event_listener/once.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import signal
from typing import Literal, Any

Expand All @@ -9,6 +10,9 @@
EventListenerSettings,
)
from port_ocean.utils.repeat import repeat_every
from port_ocean.context.ocean import ocean
from port_ocean.utils.time import convert_str_to_utc_datetime, convert_to_minutes
from port_ocean.utils.misc import IntegrationStateStatus


class OnceEventListenerSettings(EventListenerSettings):
Expand Down Expand Up @@ -41,6 +45,97 @@ def __init__(
):
super().__init__(events)
self.event_listener_config = event_listener_config
self.cached_integration: dict[str, Any] | None = None

async def get_current_integration_cached(self) -> dict[str, Any]:
if self.cached_integration:
return self.cached_integration

self.cached_integration = await ocean.port_client.get_current_integration()
return self.cached_integration

async def get_saas_resync_initialization_and_interval(
self,
) -> tuple[int | None, datetime.datetime | None]:
"""
Get the scheduled resync interval and the last updated time of the integration config for the saas application.
interval is the saas configured resync interval time.
start_time is the last updated time of the integration config.
return: (interval, start_time)
"""
if not ocean.app.is_saas():
return (None, None)

try:
integration = await self.get_current_integration_cached()
except Exception as e:
logger.exception(f"Error occurred while getting current integration {e}")
return (None, None)

interval_str = (
integration.get("spec", {})
.get("appSpec", {})
.get("scheduledResyncInterval")
)

if not interval_str:
logger.error(
"Unexpected: scheduledResyncInterval not found for Saas integration, Cannot predict the next resync"
)
return (None, None)

last_updated_saas_integration_config_str = integration.get(
"statusInfo", {}
).get("updatedAt")

# we use the last updated time of the integration config as the start time since in saas application the interval is configured by the user from the portal
if not last_updated_saas_integration_config_str:
logger.error(
"Unexpected: updatedAt not found for Saas integration, Cannot predict the next resync"
)
return (None, None)

return (
convert_to_minutes(interval_str),
convert_str_to_utc_datetime(last_updated_saas_integration_config_str),
)

async def _before_resync(self) -> None:
if not ocean.app.is_saas():
# in case of non-saas, we still want to update the state before and after the resync
await super()._before_resync()
return

(interval, start_time) = (
await self.get_saas_resync_initialization_and_interval()
)
await ocean.app.resync_state_updater.update_before_resync(interval, start_time)

async def _after_resync(self) -> None:
if not ocean.app.is_saas():
# in case of non-saas, we still want to update the state before and after the resync
await super()._after_resync()
return

(interval, start_time) = (
await self.get_saas_resync_initialization_and_interval()
)
await ocean.app.resync_state_updater.update_after_resync(
IntegrationStateStatus.Completed, interval, start_time
)

async def _on_resync_failure(self, e: Exception) -> None:
if not ocean.app.is_saas():
# in case of non-saas, we still want to update the state before and after the resync
await super()._after_resync()
return

(interval, start_time) = (
await self.get_saas_resync_initialization_and_interval()
)
await ocean.app.resync_state_updater.update_after_resync(
IntegrationStateStatus.Failed, interval, start_time
)

async def _start(self) -> None:
"""
Expand All @@ -53,7 +148,7 @@ async def _start(self) -> None:
async def resync_and_exit() -> None:
logger.info("Once event listener started")
try:
await self.events["on_resync"]({})
await self._resync({})
except Exception:
# we catch all exceptions here to make sure the application will exit gracefully
logger.exception("Error occurred while resyncing")
Expand Down
24 changes: 14 additions & 10 deletions port_ocean/core/event_listener/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@ def __init__(
):
super().__init__(events)
self.event_listener_config = event_listener_config
self._last_updated_at = None

def should_resync(self, last_updated_at: str) -> bool:
_last_updated_at = (
ocean.app.resync_state_updater.last_integration_state_updated_at
)

if _last_updated_at is None:
return self.event_listener_config.resync_on_start

return _last_updated_at != last_updated_at

async def _start(self) -> None:
"""
Expand All @@ -69,17 +78,12 @@ async def resync() -> None:
integration = await ocean.app.port_client.get_current_integration()
last_updated_at = integration["updatedAt"]

should_resync = (
self._last_updated_at is not None
or self.event_listener_config.resync_on_start
) and self._last_updated_at != last_updated_at

if should_resync:
if self.should_resync(last_updated_at):
logger.info("Detected change in integration, resyncing")
self._last_updated_at = last_updated_at
running_task: Task[Any] = get_event_loop().create_task(
self.events["on_resync"]({}) # type: ignore
ocean.app.resync_state_updater.last_integration_state_updated_at = (
last_updated_at
)
running_task: Task[Any] = get_event_loop().create_task(self._resync({}))
signal_handler.register(running_task.cancel)

await running_task
Expand Down
5 changes: 5 additions & 0 deletions port_ocean/core/handlers/resync_state_updater/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .updater import ResyncStateUpdater

__all__ = [
"ResyncStateUpdater",
]
84 changes: 84 additions & 0 deletions port_ocean/core/handlers/resync_state_updater/updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import datetime
from typing import Any, Literal
from port_ocean.clients.port.client import PortClient
from port_ocean.utils.misc import IntegrationStateStatus
from port_ocean.utils.time import get_next_occurrence


class ResyncStateUpdater:
def __init__(self, port_client: PortClient, scheduled_resync_interval: int | None):
self.port_client = port_client
self.initiated_at = datetime.datetime.now(tz=datetime.timezone.utc)
self.scheduled_resync_interval = scheduled_resync_interval

# This is used to differ between integration changes that require a full resync and state changes
# So that the polling event-listener can decide whether to perform a full resync or not
# TODO: remove this once we separate the state from the integration
self.last_integration_state_updated_at: str = ""

def _calculate_next_scheduled_resync(
self,
interval: int | None = None,
custom_start_time: datetime.datetime | None = None,
) -> str | None:
if interval is None:
return None
return get_next_occurrence(
interval * 60, custom_start_time or self.initiated_at
).isoformat()

async def update_before_resync(
self,
interval: int | None = None,
custom_start_time: datetime.datetime | None = None,
) -> None:
_interval = interval or self.scheduled_resync_interval
nest_resync = self._calculate_next_scheduled_resync(
_interval, custom_start_time
)
state: dict[str, Any] = {
"status": IntegrationStateStatus.Running.value,
"lastResyncEnd": None,
"lastResyncStart": datetime.datetime.now(
tz=datetime.timezone.utc
).isoformat(),
"nextResync": nest_resync,
"intervalInMinuets": _interval,
}

integration = await self.port_client.update_integration_state(
state, should_raise=False
)
if integration:
self.last_integration_state_updated_at = integration["resyncState"][
"updatedAt"
]

async def update_after_resync(
self,
status: Literal[
IntegrationStateStatus.Completed, IntegrationStateStatus.Failed
] = IntegrationStateStatus.Completed,
interval: int | None = None,
custom_start_time: datetime.datetime | None = None,
) -> None:
_interval = interval or self.scheduled_resync_interval
nest_resync = self._calculate_next_scheduled_resync(
_interval, custom_start_time
)
state: dict[str, Any] = {
"status": status.value,
"lastResyncEnd": datetime.datetime.now(
tz=datetime.timezone.utc
).isoformat(),
"nextResync": nest_resync,
"intervalInMinuets": _interval,
}

integration = await self.port_client.update_integration_state(
state, should_raise=False
)
if integration:
self.last_integration_state_updated_at = integration["resyncState"][
"updatedAt"
]
7 changes: 5 additions & 2 deletions port_ocean/core/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from dataclasses import dataclass, field
from typing import Any, Literal
from enum import Enum
from typing import Any

from pydantic import BaseModel
from pydantic.fields import Field


Runtime = Literal["OnPrem", "Saas"]
class Runtime(Enum):
Saas = "Saas"
OnPrem = "OnPrem"


class Entity(BaseModel):
Expand Down
Loading

0 comments on commit d54b97b

Please sign in to comment.