Skip to content

Commit

Permalink
🐛 Source Amazon S3: solve possible case of files being missed during …
Browse files Browse the repository at this point in the history
…incremental syncs (#12568)

* Added history to state

* Deleted unused import

* Rollback abnormal state file

* Rollback abnormal state file

* Fixed type error issue

* Fix state issue

* Updated after review

* Bumped version
  • Loading branch information
lazebnyi authored May 31, 2022
1 parent 0c5cdc7 commit f9348b2
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.14
dockerImageTag: 0.1.15
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7331,7 +7331,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.14"
- dockerImage: "airbyte/source-s3:0.1.15"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.io/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.version=0.1.15
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
from abc import ABC, abstractmethod
from copy import deepcopy
from datetime import datetime
from datetime import datetime, timedelta
from functools import lru_cache
from traceback import format_exc
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union
Expand Down Expand Up @@ -351,6 +351,9 @@ def read_records(
class IncrementalFileStream(FileStream, ABC):
# TODO: ideally want to checkpoint after every file or stream slice rather than N records
state_checkpoint_interval = None
buffer_days = 3 # keeping track of all files synced in the last N days
sync_all_files_always = False
max_history_size = 1000000000

@property
def cursor_field(self) -> str:
Expand All @@ -359,13 +362,59 @@ def cursor_field(self) -> str:
"""
return self.ab_last_mod_col

@staticmethod
def file_in_history(file_info: FileInfo, history: dict) -> bool:
for slot in history.values():
if file_info.key in slot:
return file_info.key in slot
return False

def _get_datetime_from_stream_state(self, stream_state: Mapping[str, Any] = None) -> datetime:
"""if no state, we default to 1970-01-01 in order to pick up all files present."""
if stream_state is not None and self.cursor_field in stream_state.keys():
return datetime.strptime(stream_state[self.cursor_field], self.datetime_format_string)
else:
return datetime.strptime("1970-01-01T00:00:00+0000", self.datetime_format_string)

def get_updated_history(self, current_stream_state, latest_record_datetime, latest_record, current_parsed_datetime, state_date):
"""
History is dict which basically groups files by their modified_at date.
After reading each record we add its file to the history set if it wasn't already there.
Then we drop from the history set any entries whose key is less than now - buffer_days
"""

history = current_stream_state.get("history", {})

file_modification_date = latest_record_datetime.strftime("%Y-%m-%d")

# add record to history if record modified date in range delta start from state
if latest_record_datetime.date() + timedelta(days=self.buffer_days) >= state_date:
history_item = set(history.setdefault(file_modification_date, set()))
history_item.add(latest_record[self.ab_file_name_col])
history[file_modification_date] = history_item

# reset history to new date state
if current_parsed_datetime.date() != state_date:
history = {
date: history[date]
for date in history
if datetime.strptime(date, "%Y-%m-%d").date() + timedelta(days=self.buffer_days) >= state_date
}

return history

def size_history_balancer(self, state_dict):
"""
Delete history if state size limit reached
"""
history = state_dict["history"]

if history.__sizeof__() > self.max_history_size:
self.sync_all_files_always = True
state_dict.pop("history")

return state_dict

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
Expand All @@ -384,7 +433,36 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
state_dict[self.cursor_field] = datetime.strftime(max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string)

state_dict["schema"] = self._get_schema_map()
return state_dict

state_date = self._get_datetime_from_stream_state(state_dict).date()

if not self.sync_all_files_always:
state_dict["history"] = self.get_updated_history(
current_stream_state, latest_record_datetime, latest_record, current_parsed_datetime, state_date
)

return self.size_history_balancer(state_dict)

def need_to_skip_file(self, stream_state, file_info):
"""
Skip this file if last_mod is earlier than our cursor value from state and already in history
or skip this file if last_mod plus delta is earlier than our cursor value
"""
file_in_history_and_last_modified_is_earlier_than_cursor_value = (
stream_state is not None
and self.cursor_field in stream_state.keys()
and file_info.last_modified <= self._get_datetime_from_stream_state(stream_state)
and self.file_in_history(file_info, stream_state.get("history", {}))
)

file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value = file_info.last_modified + timedelta(
days=self.buffer_days
) < self._get_datetime_from_stream_state(stream_state) and not self.file_in_history(file_info, stream_state.get("history", {}))

return (
file_in_history_and_last_modified_is_earlier_than_cursor_value
or file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value
)

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
Expand All @@ -395,7 +473,7 @@ def stream_slices(
This ensures we only update the cursor state to a given timestamp after ALL files with that timestamp have been successfully read.
Slight nuance: as we iterate through get_time_ordered_file_infos(),
we yield the stream_slice containing file(s) up to and EXcluding the file on the current iteration.
we yield the stream_slice containing file(s) up to and Excluding the file on the current iteration.
The stream_slice is then cleared (if we yielded it) and this iteration's file appended to the (next) stream_slice
"""
if sync_mode == SyncMode.full_refresh:
Expand All @@ -411,12 +489,7 @@ def stream_slices(
prev_file_last_mod: datetime = None # init variable to hold previous iterations last modified
grouped_files_by_time: List[Dict[str, Any]] = []
for file_info in self.get_time_ordered_file_infos():
# skip this file if last_mod is earlier than our cursor value from state
if (
stream_state is not None
and self.cursor_field in stream_state.keys()
and file_info.last_modified <= self._get_datetime_from_stream_state(stream_state)
):
if self.need_to_skip_file(stream_state, file_info):
continue

# check if this file belongs in the next slice, if so yield the current slice before this file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@
LOGGER = AirbyteLogger()


def mock_big_size_object():
mock = MagicMock()
mock.__sizeof__.return_value = 1000000001
return mock


class TestIncrementalFileStream:
@pytest.mark.parametrize( # set return_schema to None for an expected fail
"schema_string, return_schema",
[
(
'{"id": "integer", "name": "string", "valid": "boolean", "code": "integer", "degrees": "number", "birthday": "string", "last_seen": "string"}',
'{"id": "integer", "name": "string", "valid": "boolean", "code": "integer", "degrees": "number", "birthday": '
'"string", "last_seen": "string"}',
{
"id": "integer",
"name": "string",
Expand Down Expand Up @@ -329,6 +336,43 @@ def test_pattern_matched_filepath_iterator(self, patterns: str, filepaths: List[
file_infos = [create_by_local_file(filepath) for filepath in filepaths]
assert set([p.key for p in fs.pattern_matched_filepath_iterator(file_infos)]) == set(expected_filepaths)

@pytest.mark.parametrize(
"latest_record, current_stream_state, expected",
[
( # overwrite history file
{"id": 1, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "new_test_file.csv"},
{"_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "history": {"2021-07-25": {"old_test_file.csv"}}},
{"2022-05-11": {"new_test_file.csv"}},
),
( # add file to same day
{"id": 1, "_ab_source_file_last_modified": "2022-07-25T11:54:11+0000", "_ab_source_file_url": "new_test_file.csv"},
{"_ab_source_file_last_modified": "2022-07-25T00:00:00+0000", "history": {"2022-07-25": {"old_test_file.csv"}}},
{"2022-07-25": {"new_test_file.csv", "old_test_file.csv"}},
),
( # add new day to history
{"id": 1, "_ab_source_file_last_modified": "2022-07-03T11:54:11+0000", "_ab_source_file_url": "new_test_file.csv"},
{"_ab_source_file_last_modified": "2022-07-01T00:00:00+0000", "history": {"2022-07-01": {"old_test_file.csv"}}},
{"2022-07-01": {"old_test_file.csv"}, "2022-07-03": {"new_test_file.csv"}},
),
( # history size limit reached
{"_ab_source_file_url": "test.csv"},
{"_ab_source_file_last_modified": "2022-07-01T00:00:00+0000", "history": mock_big_size_object()},
None,
),
],
ids=["overwrite_history_file", "add_file_to_same_day ", "add_new_day_to_history", "history_size_limit_reached"],
)
@patch(
"source_s3.source_files_abstract.stream.IncrementalFileStream.__abstractmethods__", set()
) # patching abstractmethods to empty set so we can instantiate ABC to test
def test_get_updated_history(self, latest_record, current_stream_state, expected, request) -> None:
fs = IncrementalFileStream(dataset="dummy", provider={}, format={"filetype": "csv"}, path_pattern="**/prefix*.csv")
fs._get_schema_map = MagicMock(return_value={})
assert fs.get_updated_state(current_stream_state, latest_record).get("history") == expected

if request.node.callspec.id == "history_size_limit_reached":
assert fs.sync_all_files_always

@pytest.mark.parametrize( # set expected_return_record to None for an expected fail
"stream_state, expected_error",
[
Expand Down
4 changes: 2 additions & 2 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ This page contains the setup guide and reference information for the Amazon S3 s

## Prerequisites

- Connector-specific prerequisites which are required in both Airbyte Cloud & OSS.
- If OSS has different requirements (e.g: user needs to setup a developer application).
Define file pattern, see the [Path Patterns section](s3.md#path-patterns)

## Setup guide

Expand Down Expand Up @@ -196,6 +195,7 @@ The avro parser uses [fastavro](https://fastavro.readthedocs.io/en/latest/). Cur

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs |
| 0.1.14 | 2022-05-23 | [11967](https://github.com/airbytehq/airbyte/pull/11967) | Increase unit test coverage up to 90% |
| 0.1.13 | 2022-05-11 | [12730](https://github.com/airbytehq/airbyte/pull/12730) | Fixed empty options issue |
| 0.1.12 | 2022-05-11 | [12602](https://github.com/airbytehq/airbyte/pull/12602) | Added support for Avro file format |
Expand Down

0 comments on commit f9348b2

Please sign in to comment.