From 91b89ab47d4c01f6b25e70ad8e7d0fc0d6aa9543 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Fri, 20 Dec 2024 14:47:42 -0500 Subject: [PATCH] source-twilio: check for updated messages based on the lookback window Twilio's API only allows us to query messages based on their creation date, not their updated date. Previously, the connector was only getting creates, with the option to use the lookback window to push back the cursor value to re-emit all messages created in the past X minutes. The `messages` stream now uses the lookback window to query Twilio for messages created X minutes in the past, but it only emits messages that have been created or updated after the current cursor value. --- source-twilio/source_twilio/streams.py | 58 +++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/source-twilio/source_twilio/streams.py b/source-twilio/source_twilio/streams.py index 7fb215c17..c86569615 100644 --- a/source-twilio/source_twilio/streams.py +++ b/source-twilio/source_twilio/streams.py @@ -246,6 +246,34 @@ def read_records( raise err +class IncrementalTwilioStreamWithLookbackWindow(IncrementalTwilioStream): + # This overwrites IncrementalTwilioStream's generate_date_ranges method. The only difference is that + # this method pushes back the start datetime based on the configured lookback window. + def generate_date_ranges(self) -> Iterable[Optional[MutableMapping[str, Any]]]: + def align_to_dt_format(dt: DateTime) -> DateTime: + return pendulum.parse(dt.format(self.time_filter_template)) + + end_datetime = pendulum.now("utc") + pushed_back_start_datetime = pendulum.parse(self.state.get(self.cursor_field, self._start_date)) - pendulum.duration(minutes=self._lookback_window) + start_datetime = min(end_datetime, pushed_back_start_datetime) + current_start = start_datetime + current_end = start_datetime + # Aligning to a datetime format is done to avoid the following scenario: + # start_dt = 2021-11-14T00:00:00, end_dt (now) = 2022-11-14T12:03:01, time_filter_template = "YYYY-MM-DD" + # First slice: (2021-11-14, 2022-11-14) + # (!) Second slice: (2022-11-15, 2022-11-14) - because 2022-11-14T00:00:00 (prev end) < 2022-11-14T12:03:01, + # so we have to compare dates, not date-times to avoid yielding that last slice + while align_to_dt_format(current_end) < align_to_dt_format(end_datetime): + current_end = min(end_datetime, current_start + self.slice_step) + slice_ = { + self.lower_boundary_filter_field: current_start.format(self.time_filter_template), + self.upper_boundary_filter_field: current_end.format(self.time_filter_template), + } + yield slice_ + current_start = current_end + self.slice_granularity + + + class TwilioNestedStream(TwilioStream): """ Basic class for the streams that are dependant on the results of another stream output (parent-child relations). @@ -538,7 +566,7 @@ class Queues(TwilioNestedStream): parent_stream = Accounts -class Messages(IncrementalTwilioStream, TwilioNestedStream): +class Messages(IncrementalTwilioStreamWithLookbackWindow, TwilioNestedStream): """https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources""" parent_stream = Accounts @@ -547,6 +575,32 @@ class Messages(IncrementalTwilioStream, TwilioNestedStream): upper_boundary_filter_field = "DateSent<" cursor_field = "date_sent" + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + unsorted_records = [] + initial_cursor = self.state.get(self.cursor_field, self._start_date) + + # Skip the IncrementalTwilioStream's read_records method since it filters only based on date_sent instead of + # both date_sent and date_updated. + for record in super(IncrementalTwilioStream, self).read_records(sync_mode, cursor_field, stream_slice, stream_state): + record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string() + record["date_updated"] = pendulum.parse(record["date_updated"], strict=False).to_iso8601_string() + unsorted_records.append(record) + sorted_records = sorted(unsorted_records, key=lambda x: x[self.cursor_field]) + for record in sorted_records: + # If this is a new record, yield it and update the cursor. + if record[self.cursor_field] >= initial_cursor: + self._cursor_value = record[self.cursor_field] + yield record + # Otherwise if it's an update to a record we've seen before, yield it. + elif record["date_updated"] >= initial_cursor: + yield record + class MessageMedia(IncrementalTwilioStream, TwilioNestedStream): """https://www.twilio.com/docs/sms/api/media-resource#read-multiple-media-resources""" @@ -567,7 +621,7 @@ class MessageMedia(IncrementalTwilioStream, TwilioNestedStream): @cached_property def parent_stream_instance(self): most_recent_cursor = self.state.get(self.cursor_field, self._start_date) - return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor, lookback_window=self._lookback_window) + return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor) class UsageNestedStream(TwilioNestedStream):