-
Notifications
You must be signed in to change notification settings - Fork 38
/
target.py
587 lines (557 loc) · 23.1 KB
/
target.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
# Copyright (c) 2023 Alex Butler
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this
# software and associated documentation files (the "Software"), to deal in the Software
# without restriction, including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
# to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or
# substantial portions of the Software.
"""BigQuery target class."""
from __future__ import annotations
import copy
import time
import uuid
from typing import (
TYPE_CHECKING,
Callable,
Dict,
List,
Optional,
Tuple,
Type,
Union,
cast,
)
from singer_sdk import Sink
from singer_sdk import typing as th
from singer_sdk.target_base import Target
from target_bigquery.batch_job import (
BigQueryBatchJobDenormalizedSink,
BigQueryBatchJobSink,
)
from target_bigquery.core import (
BaseBigQuerySink,
BaseWorker,
BigQueryCredentials,
ParType,
)
from target_bigquery.gcs_stage import (
BigQueryGcsStagingDenormalizedSink,
BigQueryGcsStagingSink,
)
from target_bigquery.storage_write import (
BigQueryStorageWriteDenormalizedSink,
BigQueryStorageWriteSink,
)
from target_bigquery.streaming_insert import (
BigQueryStreamingInsertDenormalizedSink,
BigQueryStreamingInsertSink,
)
if TYPE_CHECKING:
from multiprocessing import Process, Queue
from multiprocessing.connection import Connection
# Defaults for target worker pool parameters
MAX_WORKERS = 15
"""Maximum number of workers to spawn."""
MAX_JOBS_QUEUED = 30
"""Maximum number of jobs placed in the global queue to avoid memory overload."""
WORKER_CAPACITY_FACTOR = 5
"""Jobs enqueued must exceed the number of active workers times this number."""
WORKER_CREATION_MIN_INTERVAL = 5
"""Minimum time between worker creation attempts."""
class TargetBigQuery(Target):
"""Target for BigQuery."""
_MAX_RECORD_AGE_IN_MINUTES = 5.0
name = "target-bigquery"
config_jsonschema = th.PropertiesList(
th.Property(
"credentials_path",
th.StringType,
description="The path to a gcp credentials json file.",
),
th.Property(
"credentials_json",
th.StringType,
description="A JSON string of your service account JSON file.",
),
th.Property(
"project",
th.StringType,
description="The target GCP project to materialize data into.",
required=True,
),
th.Property(
"dataset",
th.StringType,
description="The target dataset to materialize data into.",
required=True,
),
th.Property(
"location",
th.StringType,
description="The target dataset/bucket location to materialize data into.",
default="US",
),
th.Property(
"batch_size",
th.IntegerType,
description="The maximum number of rows to send in a single batch or commit.",
default=500,
),
th.Property(
"fail_fast",
th.BooleanType,
description="Fail the entire load job if any row fails to insert.",
default=True,
),
th.Property(
"timeout",
th.IntegerType,
description="Default timeout for batch_job and gcs_stage derived LoadJobs.",
default=600,
),
th.Property(
"denormalized",
th.BooleanType,
description=(
"Determines whether to denormalize the data before writing to BigQuery. A false"
" value will write data using a fixed JSON column based schema, while a true value"
" will write data using a dynamic schema derived from the tap."
),
default=False,
),
th.Property(
"method",
th.CustomType(
{
"type": "string",
"enum": [
"storage_write_api",
"batch_job",
"gcs_stage",
"streaming_insert",
],
}
),
description="The method to use for writing to BigQuery.",
default="storage_write_api",
required=True,
),
th.Property(
"generate_view",
th.BooleanType,
description=(
"Determines whether to generate a view based on the SCHEMA message parsed from the"
" tap. Only valid if denormalized=false meaning you are using the fixed JSON column"
" based schema."
),
default=False,
),
th.Property(
"bucket",
th.StringType,
description="The GCS bucket to use for staging data. Only used if method is gcs_stage.",
),
th.Property(
"partition_granularity",
th.CustomType(
{
"type": "string",
"enum": [
"year",
"month",
"day",
"hour",
],
}
),
default="month",
description="The granularity of the partitioning strategy. Defaults to month.",
),
th.Property(
"cluster_on_key_properties",
th.BooleanType,
default=False,
description=(
"Determines whether to cluster on the key properties from the tap. Defaults to"
" false. When false, clustering will be based on _sdc_batched_at instead."
),
),
th.Property(
"column_name_transforms",
th.ObjectType(
th.Property(
"lower",
th.BooleanType,
default=False,
description="Lowercase column names",
),
th.Property(
"quote",
th.BooleanType,
default=False,
description="Quote columns during DDL generation",
),
th.Property(
"add_underscore_when_invalid",
th.BooleanType,
default=False,
description="Add an underscore when a column starts with a digit",
),
th.Property(
"snake_case",
th.BooleanType,
default=False,
description="Convert columns to snake case",
),
th.Property(
"replace_period_with_underscore",
th.BooleanType,
default=False,
description="Convert periods to underscores",
),
),
description=(
"Accepts a JSON object of options with boolean values to enable them. The available"
" options are `quote` (quote columns in DDL), `lower` (lowercase column names),"
" `add_underscore_when_invalid` (add underscore if column starts with digit), and"
" `snake_case` (convert to snake case naming). For fixed schema, this transform"
" only applies to the generated view if enabled."
),
required=False,
),
th.Property(
"options",
th.ObjectType(
th.Property(
"storage_write_batch_mode",
th.BooleanType,
default=False,
description=(
"By default, we use the default stream (Committed mode) in the"
" storage_write_api load method which results in streaming records which"
" are immediately available and is generally fastest. If this is set to"
" true, we will use the application created streams (Committed mode) to"
" transactionally batch data on STATE messages and at end of pipe."
),
),
th.Property(
"process_pool",
th.BooleanType,
default=False,
description=(
"By default we use an autoscaling threadpool to write to BigQuery. If set"
" to true, we will use a process pool."
),
),
th.Property(
"max_workers",
th.IntegerType,
required=False,
description=(
"By default, each sink type has a preconfigured max worker pool limit."
" This sets an override for maximum number of workers in the pool."
),
),
),
description=(
"Accepts a JSON object of options with boolean values to enable them. These are"
" more advanced options that shouldn't need tweaking but are here for flexibility."
),
),
th.Property(
"upsert",
th.CustomType(
{
"anyOf": [
{"type": "boolean"},
{"type": "array", "items": {"type": "string"}},
]
}
),
default=False,
description=(
"Determines if we should upsert. Defaults to false. A value of true will write to a"
" temporary table and then merge into the target table (upsert). This requires the"
" target table to be unique on the key properties. A value of false will write to"
" the target table directly (append). A value of an array of strings will evaluate"
" the strings in order using fnmatch. At the end of the array, the value of the"
" last match will be used. If not matched, the default value is false (append)."
),
),
th.Property(
"overwrite",
th.CustomType(
{
"anyOf": [
{"type": "boolean"},
{"type": "array", "items": {"type": "string"}},
]
}
),
default=False,
description=(
"Determines if the target table should be overwritten on load. Defaults to false. A"
" value of true will write to a temporary table and then overwrite the target table"
" inside a transaction (so it is safe). A value of false will write to the target"
" table directly (append). A value of an array of strings will evaluate the strings"
" in order using fnmatch. At the end of the array, the value of the last match will"
" be used. If not matched, the default value is false. This is mutually exclusive"
" with the `upsert` option. If both are set, `upsert` will take precedence."
),
),
th.Property(
"dedupe_before_upsert",
th.CustomType(
{
"anyOf": [
{"type": "boolean"},
{"type": "array", "items": {"type": "string"}},
]
}
),
default=False,
description=(
"This option is only used if `upsert` is enabled for a stream. The selection"
" criteria for the stream's candidacy is the same as upsert. If the stream is"
" marked for deduping before upsert, we will create a _session scoped temporary"
" table during the merge transaction to dedupe the ingested records. This is useful"
" for streams that are not unique on the key properties during an ingest but are"
" unique in the source system. Data lake ingestion is often a good example of this"
" where the same unique record may exist in the lake at different points in time"
" from different extracts."
),
),
th.Property(
"schema_resolver_version",
th.IntegerType,
default=1,
description=(
"The version of the schema resolver to use. Defaults to 1. Version 2 uses JSON as a"
" fallback during denormalization. This only has an effect if denormalized=true"
),
allowed_values=[1, 2],
),
).to_dict()
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.max_parallelism = 1
(
self.proc_cls,
self.pipe_cls,
self.queue_cls,
self.par_typ,
) = self.get_parallelization_components()
self.queue = self.queue_cls()
self.job_notification, self.job_notifier = self.pipe_cls(False)
self.log_notification, self.log_notifier = self.pipe_cls(False)
self.error_notification, self.error_notifier = self.pipe_cls(False)
self._credentials = BigQueryCredentials(
self.config.get("credentials_path"),
self.config.get("credentials_json"),
self.config["project"],
)
def worker_factory():
return cast(
Type[BaseWorker],
self.get_sink_class().worker_cls_factory(
self.proc_cls,
dict(self.config),
),
)(
ext_id=uuid.uuid4().hex,
queue=self.queue,
credentials=self._credentials,
job_notifier=self.job_notifier,
log_notifier=self.log_notifier,
error_notifier=self.error_notifier,
)
self.worker_factory = worker_factory
self.workers: List[Union[BaseWorker, "Process"]] = []
self.worker_pings: Dict[str, float] = {}
self._jobs_enqueued = 0
self._last_worker_creation = 0.0
def increment_jobs_enqueued(self) -> None:
"""Increment the number of jobs enqueued."""
self._jobs_enqueued += 1
# We can expand this to support other parallelization methods in the future.
# We woulod approach this by adding a new ParType enum and interpreting the
# the Process, Pipe, and Queue classes as protocols which can be duck-typed.
def get_parallelization_components(
self, default=ParType.THREAD
) -> Tuple[
Type["Process"],
Callable[[bool], Tuple["Connection", "Connection"]],
Callable[[], "Queue"],
ParType,
]:
"""Get the appropriate Process, Pipe, and Queue classes and the assoc ParTyp enum."""
use_procs: Optional[bool] = self.config.get("options", {}).get("process_pool")
if use_procs is None:
use_procs = default == ParType.PROCESS
if not use_procs:
from multiprocessing.dummy import Pipe, Process, Queue
self.logger.info("Using thread-based parallelism")
return Process, Pipe, Queue, ParType.THREAD # type: ignore
else:
from multiprocessing import Pipe, Process, Queue
self.logger.info("Using process-based parallelism")
return Process, Pipe, Queue, ParType.PROCESS
# Worker management methods, which are used to manage the number of
# workers in the pool. The ensure_workers method should be called
# periodically to ensure that the pool is at the correct size, ideally
# once per batch.
@property
def add_worker_predicate(self) -> bool:
"""Predicate determining when it is valid to add a worker to the pool."""
return (
self._jobs_enqueued
> getattr(
self.get_sink_class(), "WORKER_CAPACITY_FACTOR", WORKER_CAPACITY_FACTOR
)
* (len(self.workers) + 1)
and len(self.workers)
< self.config.get("options", {}).get(
"max_workers",
getattr(self.get_sink_class(), "MAX_WORKERS", MAX_WORKERS),
)
and time.time() - self._last_worker_creation
> getattr(
self.get_sink_class(),
"WORKER_CREATION_MIN_INTERVAL",
WORKER_CREATION_MIN_INTERVAL,
)
)
def resize_worker_pool(self) -> None:
"""Right-sizes the worker pool.
Workers self terminate when they have been idle for a while.
This method will remove terminated workers and add new workers
if the add_worker_predicate evaluates to True. It will always
ensure that there is at least one worker in the pool."""
workers_to_cull = []
worker_spawned = False
for i, worker in enumerate(self.workers):
if not cast("Process", worker).is_alive():
workers_to_cull.append(i)
for i in reversed(workers_to_cull):
worker = self.workers.pop(i)
cast(
"Process", worker
).join() # Wait for the worker to terminate. This should be a no-op.
self.logger.info("Culling terminated worker %s", worker.ext_id) # type: ignore
while self.add_worker_predicate or not self.workers:
worker = self.worker_factory()
cast("Process", worker).start()
self.workers.append(worker)
worker_spawned = True
self.logger.info("Adding worker %s", worker.ext_id)
self._last_worker_creation = time.time()
if worker_spawned:
...
# SDK overrides to inject our worker management logic and sink selection.
def get_sink_class(
self, stream_name: Optional[str] = None
) -> Type[BaseBigQuerySink]:
"""Returns the sink class to use for a given stream based on user config."""
_ = stream_name
method, denormalized = (
self.config.get("method", "storage_write_api"),
self.config.get("denormalized", False),
)
if method == "batch_job":
if denormalized:
return BigQueryBatchJobDenormalizedSink
return BigQueryBatchJobSink
elif method == "streaming_insert":
if denormalized:
return BigQueryStreamingInsertDenormalizedSink
return BigQueryStreamingInsertSink
elif method == "gcs_stage":
if denormalized:
return BigQueryGcsStagingDenormalizedSink
return BigQueryGcsStagingSink
elif method == "storage_write_api":
if denormalized:
return BigQueryStorageWriteDenormalizedSink
return BigQueryStorageWriteSink
raise ValueError(f"Unknown method: {method}")
def get_sink(
self,
stream_name: str,
*,
record: Optional[dict] = None,
schema: Optional[dict] = None,
key_properties: Optional[List[str]] = None,
) -> Sink:
"""Get a sink for a stream. If the sink does not exist, create it. This override skips sink recreation
on schema change. Meaningful mid stream schema changes are not supported and extremely rare to begin
with. Most taps provide a static schema at stream init. We handle +90% of cases with this override without
the undue complexity or overhead of mid-stream schema evolution. If you need to support mid-stream schema
evolution on a regular basis, you should be using the fixed schema load pattern."""
_ = record
if schema is None:
self._assert_sink_exists(stream_name)
return self._sinks_active[stream_name]
existing_sink = self._sinks_active.get(stream_name, None)
if not existing_sink:
return self.add_sink(stream_name, schema, key_properties)
return existing_sink
def drain_one(self, sink: Sink) -> None: # type: ignore
"""Drain a sink. Includes a hook to manage the worker pool and notifications."""
# self.logger.info(f"Jobs queued : {self.queue.qsize()} | Max nb jobs queued : {os.cpu_count() * 4} | Nb workers : {len(self.workers)} | Max nb workers : {os.cpu_count() * 2}")
self.resize_worker_pool()
while self.job_notification.poll():
ext_id = self.job_notification.recv()
self.worker_pings[ext_id] = time.time()
self._jobs_enqueued -= 1
while self.log_notification.poll():
msg = self.log_notification.recv()
self.logger.info(msg)
if self.error_notification.poll():
e, msg = self.error_notification.recv()
if self.config.get("fail_fast", True):
self.logger.error(msg)
try:
# Try to drain if we can. This is a best effort.
# TODO: we should consider if draining here is the right thing
# to do. It's _possible_ we increment the state message when
# data is not actually written. Its _unlikely_ so the upside is
# greater than the downside for now but will revisit this.
self.logger.error("Draining all sinks and terminating.")
self.drain_all(is_endofpipe=True)
except Exception:
self.logger.error("Drain failed.")
raise RuntimeError(msg) from e
else:
self.logger.warning(msg)
super().drain_one(sink)
def drain_all(self, is_endofpipe: bool = False) -> None: # type: ignore
"""Drain all sinks and write state message. If is_endofpipe, execute clean_up() on all sinks.
Includes an additional hook to allow sinks to do any pre-state message processing."""
state = copy.deepcopy(self._latest_state)
sink: BaseBigQuerySink
self._drain_all(list(self._sinks_active.values()), self.max_parallelism)
if is_endofpipe:
for worker in self.workers:
if cast("Process", worker).is_alive():
self.queue.put(None)
while len(self.workers):
cast("Process", worker).join()
worker = self.workers.pop()
for sink in self._sinks_active.values(): # type: ignore
sink.clean_up()
else:
for worker in self.workers:
cast("Process", worker).join()
for sink in self._sinks_active.values(): # type: ignore
sink.pre_state_hook()
if state:
self._write_state_message(state)
self._reset_max_record_age()
def _validate_config(
self, raise_errors: bool = True, warnings_as_errors: bool = False
) -> Tuple[List[str], List[str]]:
"""Don't throw on config validation since our JSON schema doesn't seem to play well with meltano for whatever reason"""
return super()._validate_config(False, False)