Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Faster JSON dumps in format_message with msgspec #1784

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
142 commits
Select commit Hold shift + click to select a range
4d7da5e
add msgspec via poetry
BuzzCutNorman Jun 19, 2023
df7db19
change json to mspec.json
BuzzCutNorman Jun 19, 2023
85d94aa
add .decode() to convert returned byte to string
BuzzCutNorman Jun 19, 2023
75024dd
added msgspec json formatting
BuzzCutNorman Jun 21, 2023
dd8930e
add post_process to sql stream and convert keys to str
BuzzCutNorman Jun 21, 2023
58f0df3
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Jun 22, 2023
2bfc072
updated lock file
BuzzCutNorman Jun 22, 2023
e74956d
removed json.format step in format message
BuzzCutNorman Jun 26, 2023
b3dbf45
removed post_process from sql stream that converted keys to str
BuzzCutNorman Jun 26, 2023
b970b2f
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Jun 27, 2023
d4c0eef
send msgspec bytes output to stdout.buffer
BuzzCutNorman Jun 28, 2023
848a944
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Jul 7, 2023
6d8b3a5
Merge branch 'main' of https://github.com/meltano/sdk into 1046-feat-…
BuzzCutNorman Jul 11, 2023
ec6a337
ran poetry update
BuzzCutNorman Jul 11, 2023
e839d64
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Jul 13, 2023
597d49a
use msgspec 0.17.0
BuzzCutNorman Jul 13, 2023
a475213
ran poetry update
BuzzCutNorman Jul 13, 2023
7816ec0
revert to decoding binary string and write to stdout
BuzzCutNorman Jul 13, 2023
538fc74
updated and validated test_format_messages
BuzzCutNorman Jul 13, 2023
a65cbab
verified and updated test_write_message
BuzzCutNorman Jul 13, 2023
9ea4568
removed all spaces in no_map.jsonl
BuzzCutNorman Jul 13, 2023
df933dd
removed all spaces in keep_all_fields.jsonl
BuzzCutNorman Jul 13, 2023
edc091f
removed all spaces in only_mapped_fields_null_string.jsonl
BuzzCutNorman Jul 13, 2023
3660eb8
removed all spaces in only_mapped_fields.jsonl
BuzzCutNorman Jul 13, 2023
684abdb
removed all spaces in changed_key_properties.jsonl
BuzzCutNorman Jul 13, 2023
4cef269
removed all spaces in sourced_stream_1.jsonl
BuzzCutNorman Jul 13, 2023
e79f099
removed all spaces in sourced_stream_1_null_string.jsonl
BuzzCutNorman Jul 13, 2023
0e6c92c
removed all spaces in sourced_stream_2.jsonl
BuzzCutNorman Jul 13, 2023
cc1c1fe
removed all spaces in aliased_stream.jsonl
BuzzCutNorman Jul 13, 2023
a1e69d9
removed all spaces in flatten_depth_1.jsonl
BuzzCutNorman Jul 13, 2023
23a4ca4
updated flattening to use msgspec
BuzzCutNorman Jul 13, 2023
de9c93c
removed all spaces in flatten_all.jsonl
BuzzCutNorman Jul 13, 2023
2b0b429
removed all spaces in map_and_flatten.jsonl
BuzzCutNorman Jul 13, 2023
363e2d3
removed all spaces in drop_property.jsonl
BuzzCutNorman Jul 13, 2023
35ef7d7
removed all spaces in drop_property_null_string.jsonl
BuzzCutNorman Jul 13, 2023
f936a16
removed all spaces in non_pk_passthrough.jsonl
BuzzCutNorman Jul 13, 2023
3618542
removed all spaces in countries_write_schemas
BuzzCutNorman Jul 13, 2023
d4775b7
revert format_message to say it returns str
BuzzCutNorman Jul 14, 2023
5dff2fb
msgspec setup to use decimal_format="number"
BuzzCutNorman Jul 14, 2023
19f4c8b
update test_format_message to assert \n is present at the end
BuzzCutNorman Jul 14, 2023
dc849a6
refactor format_message to utilize msgspec perfomace tips
BuzzCutNorman Jul 14, 2023
2f6d5bf
Merge branch 'main' of https://github.com/meltano/sdk into 1046-feat-…
BuzzCutNorman Jul 15, 2023
1a0fcfb
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Jul 17, 2023
60e2e36
Merge branch 'main' of https://github.com/meltano/sdk into 1046-feat-…
BuzzCutNorman Jul 24, 2023
24ee6cd
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Jul 26, 2023
5dae548
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Jul 27, 2023
16cd1e1
switch to json returned in native bytes string
BuzzCutNorman Jul 28, 2023
9eb887a
updated test_message tests
BuzzCutNorman Jul 28, 2023
049c164
updated test_mapper tests
BuzzCutNorman Jul 28, 2023
ced88b6
update test_parent_child tests
BuzzCutNorman Jul 28, 2023
bd2d530
update test_tap_countries tests
BuzzCutNorman Jul 28, 2023
07ab43d
updated testing runners _execute_sync
BuzzCutNorman Jul 28, 2023
056b79c
update testing legacy sync functions
BuzzCutNorman Jul 28, 2023
66ba043
conform to stdout_buf
BuzzCutNorman Jul 28, 2023
d5aa718
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Jul 28, 2023
f0600e6
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Jul 31, 2023
9339099
update deserialize_json to utilize msgspec
BuzzCutNorman Jul 31, 2023
b2c2d5a
updated test_io tests to utlize msgspec.DecodeError
BuzzCutNorman Jul 31, 2023
3429427
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Aug 4, 2023
9f900a0
Create Struct from SCHEMA msg for json decoding
BuzzCutNorman Aug 4, 2023
fa952d7
updated test_io test to handle Struct methods
BuzzCutNorman Aug 4, 2023
663c6a3
added default of None for non present optional fields
BuzzCutNorman Aug 4, 2023
b6d8d44
Merge branch 'main' of https://github.com/meltano/sdk into 1046-feat-…
BuzzCutNorman Aug 4, 2023
9820c33
poerty lock --no-update
BuzzCutNorman Aug 4, 2023
489143d
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Aug 8, 2023
b8828d1
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Aug 15, 2023
f9713a3
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Oct 6, 2023
2677a6f
update msgspec to 0.18.4 and poetry update
BuzzCutNorman Oct 6, 2023
cb1d32e
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Oct 17, 2023
e8e34b9
updated pytest snapshot mapped_stream jsonl files
BuzzCutNorman Oct 17, 2023
b3535be
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Oct 18, 2023
27dce2c
removed a Python 3.7 from classifers, update ruff target-version to py38
BuzzCutNorman Oct 18, 2023
dde28c2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 18, 2023
77236d9
added the changes in drop support for python 3.7 from v1
BuzzCutNorman Oct 19, 2023
f437c48
updated register_stream_stuct and deserialize_record
BuzzCutNorman Oct 19, 2023
c5ef558
added float_hook=decimal.Decimal to decoder
BuzzCutNorman Oct 19, 2023
50fe1ff
added conform_name and used it in register_stream_struct and deserial…
BuzzCutNorman Oct 19, 2023
f15cb35
utilizing float_hook=decimal.Decimal in Decoder and code cleanup to m…
BuzzCutNorman Oct 23, 2023
8e813e8
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Oct 23, 2023
5cf4e76
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Oct 30, 2023
b723dad
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Nov 16, 2023
a035da0
fix pyarrow dependency walk issue
BuzzCutNorman Nov 16, 2023
3368448
fix ruff issues
BuzzCutNorman Nov 16, 2023
9c4bd90
Add SingerWriter to io_base
BuzzCutNorman Nov 16, 2023
ffd0d17
cleanup singer_lib.messages
BuzzCutNorman Nov 16, 2023
83fcafa
Add SingerWriter as a Tap parent class
BuzzCutNorman Nov 16, 2023
b6bd2e5
Update Sink class to use self_tap.write_message
BuzzCutNorman Nov 16, 2023
fc7be94
Add SingerWriter to InlineMapper as a parent class
BuzzCutNorman Nov 16, 2023
6f2c39a
Removed wirte_message from singerlib init
BuzzCutNorman Nov 16, 2023
af90f6c
update test to utilize SingerWriter class
BuzzCutNorman Nov 16, 2023
20a53f2
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Nov 28, 2023
04c3338
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Nov 28, 2023
1be25c1
attempt to resolve dependency walk issue
BuzzCutNorman Nov 28, 2023
b41c6a3
attempt to resolve e2e test failures
BuzzCutNorman Nov 29, 2023
0429fbb
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Nov 30, 2023
5c5d173
reduce mypy issue
BuzzCutNorman Dec 1, 2023
6b68391
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Dec 4, 2023
14f8b78
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Dec 5, 2023
55cbc5f
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Dec 5, 2023
4dc2fe1
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Dec 6, 2023
792277b
updated tests to utlize RunnerStandardOutErr
BuzzCutNorman Dec 6, 2023
21fbc60
added dec_hook to appease mypy and enc_hook to match PR2090
BuzzCutNorman Dec 6, 2023
257f312
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Dec 6, 2023
93b4488
added xfail to test_sync_sqlite_to_sqlite
BuzzCutNorman Dec 7, 2023
25ca6e8
added xfail to test_hostile_to_sqlite
BuzzCutNorman Dec 7, 2023
e616df4
added xfail to test_sqlite_state
BuzzCutNorman Dec 7, 2023
4876246
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Dec 7, 2023
e484602
added enc_hook and dec_hook tests to test_io
BuzzCutNorman Dec 7, 2023
b0ac8aa
cleanup of unused code
BuzzCutNorman Dec 7, 2023
719652d
paramaterized test_dec_hook and test_enc_hook
BuzzCutNorman Dec 7, 2023
5eefa0e
add test_listen_file_input to test_io
BuzzCutNorman Dec 8, 2023
418d42f
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Dec 8, 2023
643ee6d
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Dec 11, 2023
35d8063
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Jan 10, 2024
517f6f1
bump msgspec to 0.18.5
BuzzCutNorman Jan 10, 2024
3a8eb0f
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Jan 11, 2024
a9023f8
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Jan 22, 2024
98f17e5
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Jan 22, 2024
3caf6c1
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Jan 23, 2024
efea905
Apply suggestions from code review
BuzzCutNorman Jan 23, 2024
6ce9202
ran poetry lock --no-update
BuzzCutNorman Jan 23, 2024
ed523e7
removed importlib metadata and typing final
BuzzCutNorman Jan 23, 2024
e12dc58
switch to using t.Final to decorate listen in SingerReader
BuzzCutNorman Jan 23, 2024
187a347
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
edgarrmondragon Jan 26, 2024
e35d2b0
fix: Reduce amount of unnecessary whitespace in Singer output (#2184)
edgarrmondragon Jan 26, 2024
ad73b78
docs: Call out minimum recommended `cookiecutter` version (#2186)
edgarrmondragon Jan 26, 2024
3913955
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
edgarrmondragon Jan 26, 2024
60e591e
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
edgarrmondragon Jan 27, 2024
2f7d05b
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Jan 29, 2024
3967985
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Feb 15, 2024
039fd8c
updated flattening record test - removed white space
BuzzCutNorman Feb 16, 2024
32be3a9
updated snapshot files to work with msgpec output
BuzzCutNorman Feb 16, 2024
e08d7ca
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Feb 19, 2024
9526e31
update flattening to use encoder similar to io_base
BuzzCutNorman Feb 19, 2024
718a640
updated flattening snapshots
BuzzCutNorman Feb 19, 2024
9a172d8
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Feb 23, 2024
c331bb3
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Mar 1, 2024
559d352
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
BuzzCutNorman Mar 4, 2024
6c9760c
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Mar 5, 2024
dc4b35d
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Mar 28, 2024
2fb541c
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1046…
BuzzCutNorman Jun 24, 2024
1c7de71
Merge branch 'main' into 1046-feat-faster-json-dumps-with-msgspec
edgarrmondragon Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ inflection = ">=0.5.1"
joblib = ">=1.3.0"
jsonpath-ng = ">=1.5.3"
jsonschema = ">=4.16.0"
msgspec = ">=0.18.0"
packaging = ">=23.1"
pendulum = ">=2.1.0,<4"
python-dateutil = ">=2.8.2"
Expand Down
20 changes: 18 additions & 2 deletions singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,29 @@
import re
import typing as t
from copy import deepcopy
from datetime import datetime

import inflection
import simplejson as json
import msgspec

DEFAULT_FLATTENING_SEPARATOR = "__"


def enc_hook(obj: t.Any) -> t.Any: # noqa: ANN401
"""Enocding type helper for non native types.

Args:
obj: the item to be encoded

Returns:
The object converted to the appropriate type, default is str
"""
return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj)


encoder = msgspec.json.Encoder(enc_hook=enc_hook, decimal_format="number")


class FlatteningOptions(t.NamedTuple):
"""A stream map which performs the flattening role."""

Expand Down Expand Up @@ -435,7 +451,7 @@ def _flatten_record(
items.append(
(
new_key,
json.dumps(v, use_decimal=True, default=str)
encoder.encode(v).decode()
if _should_jsondump_value(k, v, flattened_schema)
else v,
),
Expand Down
64 changes: 49 additions & 15 deletions singer_sdk/io_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,57 @@

import abc
import decimal
import json
import logging
import sys
import typing as t
from collections import Counter, defaultdict
from datetime import datetime

import msgspec

from singer_sdk._singerlib.messages import Message, SingerMessageType
from singer_sdk._singerlib.messages import format_message as singer_format_message
from singer_sdk._singerlib.messages import write_message as singer_write_message
from singer_sdk.exceptions import InvalidInputLine

logger = logging.getLogger(__name__)
msg_buffer = bytearray(64)


def enc_hook(obj: t.Any) -> t.Any: # noqa: ANN401
"""Enocding type helper for non native types.

Args:
obj: the item to be encoded

Returns:
The object converted to the appropriate type, default is str
"""
return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj)


encoder = msgspec.json.Encoder(enc_hook=enc_hook, decimal_format="number")


def dec_hook(type: type, obj: t.Any) -> t.Any: # noqa: ARG001, A002, ANN401
"""Decoding type helper for non native types.

Args:
type: the type given
obj: the item to be decoded

Returns:
The object converted to the appropriate type, default is str.
"""
return str(obj)


decoder = msgspec.json.Decoder(dec_hook=dec_hook, float_hook=decimal.Decimal)


class SingerReader(metaclass=abc.ABCMeta):
"""Interface for all plugins reading Singer messages from stdin."""

@t.final
def listen(self, file_input: t.IO[str] | None = None) -> None:
def listen(self, file_input: t.IO | None = None) -> None:
"""Read from input until all messages are processed.

Args:
Expand All @@ -31,7 +63,7 @@ def listen(self, file_input: t.IO[str] | None = None) -> None:
This method is internal to the SDK and should not need to be overridden.
"""
if not file_input:
file_input = sys.stdin
file_input = sys.stdin.buffer

self._process_lines(file_input)
self._process_endofpipe()
Expand All @@ -52,7 +84,7 @@ def _assert_line_requires(line_dict: dict, requires: set[str]) -> None:
msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}"
raise InvalidInputLine(msg)

def deserialize_json(self, line: str) -> dict: # noqa: PLR6301
def deserialize_json(self, line: bytes) -> dict: # noqa: PLR6301
"""Deserialize a line of json.

Args:
Expand All @@ -62,18 +94,17 @@ def deserialize_json(self, line: str) -> dict: # noqa: PLR6301
A dictionary of the deserialized json.

Raises:
json.decoder.JSONDecodeError: raised if any lines are not valid json
msgspec.DecodeError: raised if any lines are not valid json
"""
try:
return json.loads( # type: ignore[no-any-return]
return decoder.decode( # type: ignore[no-any-return]
line,
parse_float=decimal.Decimal,
)
except json.decoder.JSONDecodeError as exc:
except msgspec.DecodeError as exc:
logger.exception("Unable to parse:\n%s", line, exc_info=exc)
raise

def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]:
def _process_lines(self, file_input: t.IO) -> t.Counter[str]:
"""Internal method to process jsonl lines from a Singer tap.

Args:
Expand Down Expand Up @@ -145,7 +176,7 @@ def _process_endofpipe(self) -> None: # noqa: PLR6301
class SingerWriter:
"""Interface for all plugins writting Singer messages to stdout."""

def format_message(self, message: Message) -> str: # noqa: PLR6301
def format_message(self, message: Message) -> bytes: # noqa: PLR6301
"""Format a message as a JSON string.

Args:
Expand All @@ -154,12 +185,15 @@ def format_message(self, message: Message) -> str: # noqa: PLR6301
Returns:
The formatted message.
"""
return singer_format_message(message)
encoder.encode_into(message.to_dict(), msg_buffer)
msg_buffer.extend(b"\n")
return msg_buffer

def write_message(self, message: Message) -> None: # noqa: PLR6301
def write_message(self, message: Message) -> None:
"""Write a message to stdout.

Args:
message: The message to write.
"""
singer_write_message(message)
sys.stdout.buffer.write(self.format_message(message))
sys.stdout.flush()
2 changes: 1 addition & 1 deletion singer_sdk/mapper_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def invoke( # type: ignore[override]
about: bool = False,
about_format: str | None = None,
config: tuple[str, ...] = (),
file_input: t.IO[str] | None = None,
file_input: t.IO[bytes] | None = None,
) -> None:
"""Invoke the mapper.

Expand Down
4 changes: 2 additions & 2 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _handle_max_record_age(self) -> None:
)
self.drain_all()

def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]:
def _process_lines(self, file_input: t.IO) -> t.Counter[str]:
"""Internal method to process jsonl lines from a Singer tap.

Args:
Expand Down Expand Up @@ -544,7 +544,7 @@ def invoke( # type: ignore[override]
about: bool = False,
about_format: str | None = None,
config: tuple[str, ...] = (),
file_input: t.IO[str] | None = None,
file_input: t.IO[bytes] | None = None,
) -> None:
"""Invoke the target.

Expand Down
33 changes: 21 additions & 12 deletions singer_sdk/testing/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from contextlib import redirect_stderr, redirect_stdout

import singer_sdk._singerlib as singer
from singer_sdk.testing.runners import RunnerStandardOutErr

if t.TYPE_CHECKING:
from singer_sdk.mapper_base import InlineMapper
Expand Down Expand Up @@ -111,7 +112,7 @@ def get_standard_target_tests(
return []


def tap_sync_test(tap: Tap) -> tuple[io.StringIO, io.StringIO]:
def tap_sync_test(tap: Tap) -> tuple[RunnerStandardOutErr, io.StringIO]:
"""Invokes a Tap object and return STDOUT and STDERR results in StringIO buffers.

Args:
Expand All @@ -120,11 +121,15 @@ def tap_sync_test(tap: Tap) -> tuple[io.StringIO, io.StringIO]:
Returns:
A 2-item tuple with StringIO buffers from the Tap's output: (stdout, stderr)
"""
stdout_buf = io.StringIO()
stdout_buf = RunnerStandardOutErr()
stderr_buf = io.StringIO()

with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
tap.sync_all()
stdout_buf.seek(0)

# Add decoded buffer items into stdout_buf
stdout_buf.load_from_buffer()

stderr_buf.seek(0)
return stdout_buf, stderr_buf

Expand Down Expand Up @@ -171,7 +176,7 @@ def _select_all(catalog_dict: dict) -> dict:

def target_sync_test(
target: Target,
input: io.StringIO | None, # noqa: A002
input: RunnerStandardOutErr | None, # noqa: A002
*,
finalize: bool = True,
) -> tuple[io.StringIO, io.StringIO]:
Expand Down Expand Up @@ -203,7 +208,7 @@ def target_sync_test(
def tap_to_target_sync_test(
tap: Tap,
target: Target,
) -> tuple[io.StringIO, io.StringIO, io.StringIO, io.StringIO]:
) -> tuple[RunnerStandardOutErr, io.StringIO, io.StringIO, io.StringIO]:
"""Test and end-to-end sink from the tap to the target.

Note: This method buffers all output from the tap in memory and should not be
Expand All @@ -222,7 +227,7 @@ def tap_to_target_sync_test(
target_stdout, target_stderr = target_sync_test(target, tap_stdout, finalize=True)

# Reset the tap's stdout buffer before returning
tap_stdout.seek(0)
tap_stdout.rewind()

return tap_stdout, tap_stderr, target_stdout, target_stderr

Expand All @@ -236,19 +241,23 @@ def sync_end_to_end(tap: Tap, target: Target, *mappers: InlineMapper) -> None:
mappers: Zero or more inline mapper to apply in between the tap and target, in
order.
"""
buf = io.StringIO()
with redirect_stdout(buf):
stdout_buf = RunnerStandardOutErr()
with redirect_stdout(stdout_buf):
tap.sync_all()

buf.seek(0)
mapper_output = buf
# Add decoded buffer items into stdout_buf
stdout_buf.rewind()

mapper_output = stdout_buf

for mapper in mappers:
buf = io.StringIO()
buf = RunnerStandardOutErr()
with redirect_stdout(buf):
mapper.listen(mapper_output)

buf.seek(0)
# Add decoded buffer items into stdout_buf
buf.rewind()

mapper_output = buf

target.listen(mapper_output)
Loading
Loading