-
Notifications
You must be signed in to change notification settings - Fork 71
/
runners.py
304 lines (253 loc) · 9.46 KB
/
runners.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""Utility object for running taps/targets, capturing sync output during testing."""
from __future__ import annotations
import abc
import io
import json
import typing as t
from collections import defaultdict
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path
from singer_sdk import Tap, Target
from singer_sdk.testing.config import SuiteConfig
class SingerTestRunner(metaclass=abc.ABCMeta):
"""Base Singer Test Runner."""
raw_messages: list[dict] = []
schema_messages: list[dict] = []
record_messages: list[dict] = []
state_messages: list[dict] = []
records: defaultdict = defaultdict(list)
def __init__(
self,
singer_class: type[Tap] | type[Target],
config: dict | None = None,
suite_config: SuiteConfig | None = None,
**kwargs: t.Any,
) -> None:
"""Initialize the test runner object.
Args:
singer_class (type[PluginBase]): Singer class to be tested.
config (dict): Tap/Target configuration for testing.
suite_config (SuiteConfig): SuiteConfig instance to be used when
instantiating tests.
kwargs (dict): Default arguments to be passed to tap/target on create.
"""
self.singer_class = singer_class
self.config = config or {}
self.default_kwargs = kwargs
self.suite_config = suite_config or SuiteConfig()
@staticmethod
def _clean_sync_output(raw_records: str) -> list[dict]:
"""Clean sync output.
Args:
raw_records: String containing raw messages.
Returns:
A list of raw messages in dict form.
"""
lines = raw_records.strip().split("\n")
return [json.loads(ii) for ii in lines if ii]
def create(self, kwargs: dict | None = None) -> Tap | Target:
"""Create a new tap/target from the runner defaults.
Args:
kwargs (dict, optional): [description]. Defaults to None.
Returns:
An instantiated Tap or Target.
"""
if not kwargs:
kwargs = self.default_kwargs
return self.singer_class(config=self.config, **kwargs)
@abc.abstractmethod
def sync_all(self, **kwargs: t.Any) -> None:
"""Sync all records.
Args:
kwargs: Keyword arguments.
"""
class TapTestRunner(SingerTestRunner):
"""Utility class to simplify tap testing."""
def __init__(
self,
tap_class: type[Tap],
config: dict | None = None,
suite_config: SuiteConfig | None = None,
**kwargs: t.Any,
) -> None:
"""Initialize Tap instance.
Args:
tap_class: Tap class to run.
config: Config dict to pass to Tap class.
suite_config (SuiteConfig): SuiteConfig instance to be used when
instantiating tests.
kwargs: Default arguments to be passed to tap on create.
"""
super().__init__(
singer_class=tap_class,
config=config or {},
suite_config=suite_config,
**kwargs,
)
def new_tap(self) -> Tap:
"""Get new Tap instance.
Returns:
A configured Tap instance.
"""
return t.cast(Tap, self.create())
def run_discovery(self) -> str:
"""Run tap discovery.
Returns:
The catalog as a string.
"""
return self.new_tap().run_discovery()
def run_connection_test(self) -> bool:
"""Run tap connection test.
Returns:
True if connection test passes, else False.
"""
new_tap = self.new_tap()
return new_tap.run_connection_test()
def run_sync_dry_run(self) -> bool:
"""Run tap sync dry run.
Returns:
True if dry run test passes, else False.
"""
new_tap = self.new_tap()
dry_run_record_limit = None
if self.suite_config.max_records_limit is not None:
dry_run_record_limit = self.suite_config.max_records_limit
return new_tap.run_sync_dry_run(dry_run_record_limit=dry_run_record_limit)
def sync_all(self, **kwargs: t.Any) -> None: # noqa: ARG002
"""Run a full tap sync, assigning output to the runner object.
Args:
kwargs: Unused keyword arguments.
"""
stdout, stderr = self._execute_sync()
messages = self._clean_sync_output(stdout)
self._parse_records(messages)
def _parse_records(self, messages: list[dict]) -> None:
"""Save raw and parsed messages onto the runner object.
Args:
messages: A list of messages in dict form.
"""
self.raw_messages = messages
for message in messages:
if message:
if message["type"] == "STATE":
self.state_messages.append(message)
continue
if message["type"] == "SCHEMA":
self.schema_messages.append(message)
continue
if message["type"] == "RECORD":
stream_name = message["stream"]
self.record_messages.append(message)
self.records[stream_name].append(message["record"])
continue
def _execute_sync(self) -> tuple[str, str]:
"""Invoke a Tap object and return STDOUT and STDERR results in StringIO buffers.
Returns:
A 2-item tuple with StringIO buffers from the Tap's output: (stdout, stderr)
"""
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
self.run_sync_dry_run()
stdout_buf.seek(0)
stderr_buf.seek(0)
return stdout_buf.read(), stderr_buf.read()
class TargetTestRunner(SingerTestRunner):
"""Utility class to simplify target testing."""
def __init__(
self,
target_class: type[Target],
config: dict | None = None,
suite_config: SuiteConfig | None = None,
input_filepath: Path | None = None,
input_io: io.StringIO | None = None,
**kwargs: t.Any,
) -> None:
"""Initialize TargetTestRunner.
Args:
target_class: Target Class to instantiate.
config: Config to pass to instantiated Target.
suite_config: Config to pass to tests.
input_filepath: (optional) Path to a singer file containing records, to pass
to the Target during testing.
input_io: (optional) StringIO containing raw records to pass to the Target
during testing.
kwargs: Default arguments to be passed to tap/target on create.
"""
super().__init__(
singer_class=target_class,
config=config or {},
suite_config=suite_config,
**kwargs,
)
self.input_filepath = input_filepath
self.input_io = input_io
self._input: t.IO[str] | None = None
def new_target(self) -> Target:
"""Get new Target instance.
Returns:
A configured Target instance.
"""
return t.cast(Target, self.create())
@property
def target_input(self) -> t.IO[str]:
"""Input messages to pass to Target.
Returns:
A list of raw input messages in string form.
"""
if self._input is None:
if self.input_io:
self._input = self.input_io
elif self.input_filepath:
self._input = Path(self.input_filepath).open()
return t.cast(t.IO[str], self._input)
@target_input.setter
def target_input(self, value: t.IO[str]) -> None:
self._input = value
def sync_all(
self,
*,
finalize: bool = True,
**kwargs: t.Any, # noqa: ARG002
) -> None:
"""Run a full tap sync, assigning output to the runner object.
Args:
finalize: True to process as the end of stream as a completion signal;
False to keep the sink operation open for further records.
kwargs: Unused keyword arguments.
"""
target = self.new_target()
stdout, stderr = self._execute_sync(
target=target,
target_input=self.target_input,
finalize=finalize,
)
self.stdout, self.stderr = (stdout.read(), stderr.read())
self.state_messages.extend(self._clean_sync_output(self.stdout))
def _execute_sync(
self,
target: Target,
target_input: t.IO[str],
*,
finalize: bool = True,
) -> tuple[io.StringIO, io.StringIO]:
"""Invoke the target with the provided input.
Args:
target: Target to sync.
target_input: The input to process as if from STDIN.
finalize: True to process as the end of stream as a completion signal;
False to keep the sink operation open for further records.
Returns:
A 2-item tuple with StringIO buffers from the Target's output:
(stdout, stderr)
"""
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
if target_input is not None:
target._process_lines(target_input)
if finalize:
target._process_endofpipe()
stdout_buf.seek(0)
stderr_buf.seek(0)
return stdout_buf, stderr_buf