Skip to content

Commit

Permalink
refactor: Use context manager to read gzip batch files (#2628)
Browse files Browse the repository at this point in the history
* refactor: Use context manager to read gzip batch files

* Remove redundant variable
  • Loading branch information
edgarrmondragon authored Aug 26, 2024
1 parent ddb922a commit 2a3d3fa
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ def _after_process_record(self, context: dict) -> None:

# SDK developer overrides:

def preprocess_record(self, record: dict, context: dict) -> dict: # noqa: ARG002, PLR6301
def preprocess_record(self, record: dict, context: dict) -> dict: # noqa: PLR6301, ARG002
"""Process incoming record and return a modified result.
Args:
Expand Down Expand Up @@ -743,12 +743,15 @@ def process_batch_files(
tail,
mode="rb",
) as file:
context_file = (
gzip_open(file) if encoding.compression == "gzip" else file
)
context = {
"records": [deserialize_json(line) for line in context_file] # type: ignore[attr-defined]
}
if encoding.compression == "gzip":
with gzip_open(file) as context_file:
context = {
"records": [
deserialize_json(line) for line in context_file
]
}
else:
context = {"records": [deserialize_json(line) for line in file]}
self.process_batch(context)
elif (
importlib.util.find_spec("pyarrow")
Expand All @@ -760,8 +763,7 @@ def process_batch_files(
tail,
mode="rb",
) as file:
context_file = file
table = pq.read_table(context_file)
table = pq.read_table(file)
context = {"records": table.to_pylist()}
self.process_batch(context)
else:
Expand Down

0 comments on commit 2a3d3fa

Please sign in to comment.