Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source S3 - memory & performance optimisations + advanced CSV options #6615

Merged
merged 14 commits into from
Oct 19, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from .formats.csv_parser import CsvParser
from .formats.parquet_parser import ParquetParser
from .storagefile import StorageFile

JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"]

Expand Down Expand Up @@ -60,7 +59,7 @@ def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str
if schema:
self._schema = self._parse_user_input_schema(schema)
self.master_schema = None
self.storagefile_cache: Optional[List[Tuple[datetime, StorageFile]]] = None
self.time_ordered_filepath_cache: Optional[List[Tuple[datetime, str]]] = None
self.logger = AirbyteLogger()
self.logger.info(f"initialised stream with format: {format}")

Expand Down Expand Up @@ -143,7 +142,7 @@ def pattern_matched_filepath_iterator(self, filepaths: Iterable[str]) -> Iterato
if globmatch(filepath, self._path_pattern, flags=GLOBSTAR | SPLIT):
yield filepath

def time_ordered_storagefile_iterator(self) -> Iterable[Tuple[datetime, StorageFile]]:
def time_ordered_filepath_iterator(self) -> Iterable[Tuple[datetime, str]]:
"""
Phlair marked this conversation as resolved.
Show resolved Hide resolved
Iterates through pattern_matched_filepath_iterator(), acquiring last_modified property of each file to return in time ascending order.
Uses concurrent.futures to thread this asynchronously in order to improve performance when there are many files (network I/O)
Expand All @@ -152,11 +151,11 @@ def time_ordered_storagefile_iterator(self) -> Iterable[Tuple[datetime, StorageF
:return: list in time-ascending order
"""

def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, StorageFile]:
def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, str]:
fc = self.storagefile_class(filepath, self._provider)
return (fc.last_modified, fc)
return (fc.last_modified, filepath)

Phlair marked this conversation as resolved.
Show resolved Hide resolved
if self.storagefile_cache is None:
if self.time_ordered_filepath_cache is None:
storagefiles = []
# use concurrent future threads to parallelise grabbing last_modified from all the files
Phlair marked this conversation as resolved.
Show resolved Hide resolved
# TODO: don't hardcode max_workers like this
Expand All @@ -170,10 +169,10 @@ def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, StorageFile]:
# this will failfast on any errors
storagefiles.append(future.result())

# The array storagefiles contain tuples of (last_modified, StorageFile), so sort by last_modified
self.storagefile_cache = sorted(storagefiles, key=itemgetter(0))
# The array storagefiles contain tuples of (last_modified, filepath), so sort by last_modified
self.time_ordered_filepath_cache = sorted(storagefiles, key=itemgetter(0))

return self.storagefile_cache
return self.time_ordered_filepath_cache

def _get_schema_map(self) -> Mapping[str, Any]:
if self._schema != {}:
Expand All @@ -198,26 +197,33 @@ def get_json_schema(self) -> Mapping[str, Any]:
properties[self.ab_last_mod_col]["format"] = "date-time"
return {"type": "object", "properties": properties}

def _get_master_schema(self) -> Mapping[str, Any]:
def _get_master_schema(self, min_datetime: datetime = None) -> Mapping[str, Any]:
"""
In order to auto-infer a schema across many files and/or allow for additional properties (columns),
we need to determine the superset of schemas across all relevant files.
This method iterates through time_ordered_storagefile_iterator() obtaining the inferred schema (process implemented per file format),
This method iterates through time_ordered_filepath_iterator() obtaining the inferred schema (process implemented per file format),
to build up this superset schema (master_schema).
This runs datatype checks to Warn or Error if we find incompatible schemas (e.g. same column is 'date' in one file but 'float' in another).
This caches the master_schema after first run in order to avoid repeated compute and network calls to infer schema on all files.

:param min_datetime: if passed, will only use files with last_modified >= this to determine master schema

:raises RuntimeError: if we find datatype mismatches between files or between a file and schema state (provided or from previous inc. batch)
:return: A dict of the JSON schema representing this stream.
"""
# TODO: could implement a (user-beware) 'lazy' mode that skips schema checking to improve performance
# TODO: could utilise min_datetime to add a start_date parameter in spec for user
if self.master_schema is None:
master_schema = deepcopy(self._schema)

file_reader = self.fileformatparser_class(self._format)

# time order isn't necessary here but we might as well use this method so we cache the list for later use
for _, storagefile in self.time_ordered_storagefile_iterator():
for last_mod, filepath in self.time_ordered_filepath_iterator():
# skip this file if it's earlier than min_datetime
if (min_datetime is not None) and (last_mod < min_datetime):
continue

storagefile = self.storagefile_class(filepath, self._provider)
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f)

Expand Down Expand Up @@ -269,7 +275,8 @@ def stream_slices(
# TODO: this could be optimised via concurrent reads, however we'd lose chronology and need to deal with knock-ons of that
# we could do this concurrently both full and incremental by running batches in parallel
# and then incrementing the cursor per each complete batch
for last_mod, storagefile in self.time_ordered_storagefile_iterator():
for last_mod, filepath in self.time_ordered_filepath_iterator():
storagefile = self.storagefile_class(filepath, self._provider)
yield [{"unique_url": storagefile.url, "last_modified": last_mod, "storagefile": storagefile}]
# in case we have no files
Phlair marked this conversation as resolved.
Show resolved Hide resolved
yield from [None]
Phlair marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this handled differently when the loop is empty now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally added this as SAT were failing without it. Now when stream_slices() yields nothing it's handled correctly.

Expand Down Expand Up @@ -313,21 +320,17 @@ def _add_extra_fields_from_map(self, record: Mapping[str, Any], extra_map: Mappi
record[key] = value
return record

def read_records(
def _read_from_slice(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
file_reader,
stream_slice: Mapping[str, Any],
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
Uses provider-relevant StorageFile to open file and then iterates through stream_records() using format-relevant FileFormatParser.
Uses provider-relevant StorageFile to open file and then iterates through stream_records() using format-relevant AbstractFileParser.
Records are mutated on the fly using _match_target_schema() and _add_extra_fields_from_map() to achieve desired final schema.
Since this is called per stream_slice, this method works for both full_refresh and incremental so sync_mode is ignored.
Since this is called per stream_slice, this method works for both full_refresh and incremental.
"""
stream_slice = stream_slice if stream_slice is not None else []
file_reader = self.fileformatparser_class(self._format, self._get_master_schema())

# TODO: read all files in a stream_slice concurrently
for file_info in stream_slice:
with file_info["storagefile"].open(file_reader.is_binary) as f:
Expand All @@ -343,26 +346,44 @@ def read_records(
)
yield complete_record
self.logger.info("finished reading a stream slice")

# Always return an empty generator just in case no records were ever yielded
yield from []

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic
"""
stream_slice = stream_slice if stream_slice is not None else []
file_reader = self.fileformatparser_class(self._format, self._get_master_schema())

yield from self._read_from_slice(file_reader, stream_slice)


class IncrementalFileStream(FileStream, ABC):

# TODO: ideally want to checkpoint after every file or stream slice rather than N records
state_checkpoint_interval = None

# TODO: would be great if we could override time_ordered_storagefile_iterator() here with state awareness
# this would allow filtering down to only files we need early and avoid unnecessary work

@property
def cursor_field(self) -> str:
"""
:return: The name of the cursor field.
"""
return self.ab_last_mod_col

def _get_datetime_from_stream_state(self, stream_state: Mapping[str, Any] = None) -> datetime:
""" if no state, we default to 1970-01-01 in order to pick up all files present. """
if stream_state is not None and self.cursor_field in stream_state.keys():
return datetime.strptime(stream_state[self.cursor_field], self.datetime_format_string)
else:
return datetime.strptime("1970-01-01T00:00:00+0000", self.datetime_format_string)
Phlair marked this conversation as resolved.
Show resolved Hide resolved

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
Expand All @@ -374,16 +395,11 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
:return: An updated state object
"""
state_dict = {}
if current_stream_state is not None and self.cursor_field in current_stream_state.keys():
current_parsed_datetime = datetime.strptime(current_stream_state[self.cursor_field], self.datetime_format_string)
latest_record_datetime = datetime.strptime(
latest_record.get(self.cursor_field, "1970-01-01T00:00:00+0000"), self.datetime_format_string
)
state_dict[self.cursor_field] = datetime.strftime(
max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string
)
else:
state_dict[self.cursor_field] = "1970-01-01T00:00:00+0000"
current_parsed_datetime = self._get_datetime_from_stream_state(current_stream_state)
latest_record_datetime = datetime.strptime(
latest_record.get(self.cursor_field, "1970-01-01T00:00:00+0000"), self.datetime_format_string
)
state_dict[self.cursor_field] = datetime.strftime(max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string)

state_dict["schema"] = self._get_schema_map()
return state_dict
Expand All @@ -396,7 +412,7 @@ def stream_slices(
An incremental stream_slice is a group of all files with the exact same last_modified timestamp.
This ensures we only update the cursor state to a given timestamp after ALL files with that timestamp have been successfully read.

Slight nuance: as we iterate through time_ordered_storagefile_iterator(),
Slight nuance: as we iterate through time_ordered_filepath_iterator(),
we yield the stream_slice containing file(s) up to and EXcluding the file on the current iteration.
The stream_slice is then cleared (if we yielded it) and this iteration's file appended to the (next) stream_slice
"""
Expand All @@ -413,15 +429,16 @@ def stream_slices(
prev_file_last_mod = None # init variable to hold previous iterations last modified
stream_slice = []

for last_mod, storagefile in self.time_ordered_storagefile_iterator():
for last_mod, filepath in self.time_ordered_filepath_iterator():
# skip this file if last_mod is earlier than our cursor value from state
if (
stream_state is not None
and self.cursor_field in stream_state.keys()
and last_mod <= datetime.strptime(stream_state[self.cursor_field], self.datetime_format_string)
and last_mod <= self._get_datetime_from_stream_state(stream_state)
):
continue

storagefile = self.storagefile_class(filepath, self._provider)
# check if this storagefile belongs in the next slice, if so yield the current slice before this file
if (prev_file_last_mod is not None) and (last_mod != prev_file_last_mod):
yield stream_slice
Expand All @@ -437,3 +454,25 @@ def stream_slices(

# in case we have no files
yield from [None]

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic.
We override this for incremental so we can pass our minimum datetime from state into _get_master_schema().
This means we only parse the schema of new files on incremental runs rather than all files in the bucket.
"""
if sync_mode == SyncMode.full_refresh:
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)

else:
stream_slice = stream_slice if stream_slice is not None else []
file_reader = self.fileformatparser_class(
self._format, self._get_master_schema(self._get_datetime_from_stream_state(stream_state))
)
yield from self._read_from_slice(file_reader, stream_slice)