-
Notifications
You must be signed in to change notification settings - Fork 38
/
batch_job.py
155 lines (133 loc) · 5.12 KB
/
batch_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Copyright (c) 2023 Alex Butler
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this
# software and associated documentation files (the "Software"), to deal in the Software
# without restriction, including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
# to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or
# substantial portions of the Software.
"""BigQuery Batch Job Sink."""
import os
from io import BytesIO
from mmap import mmap
from multiprocessing import Process
from multiprocessing.dummy import Process as _Thread
from queue import Empty
from typing import Any, Dict, Optional, Type, Union, cast
import orjson
from google.cloud import bigquery
from target_bigquery.core import (
BaseBigQuerySink,
BaseWorker,
Compressor,
Denormalized,
ParType,
bigquery_client_factory,
)
class Job:
def __init__(
self,
data: Union[memoryview, bytes, mmap],
table: bigquery.TableReference,
config: Dict[str, Any],
timeout: Optional[float] = 600.0,
) -> None:
self.data = data
self.table = table
self.config = config
self.timeout = timeout
self.attempt = 1
class BatchJobWorker(BaseWorker):
"""Worker that loads data into BigQuery via LoadJobs."""
def run(self) -> None:
"""Run the worker."""
client: bigquery.Client = bigquery_client_factory(self.credentials)
while True:
try:
job: Optional[Job] = self.queue.get(timeout=30.0)
except Empty:
break
if job is None:
break
try:
client.load_table_from_file(
BytesIO(job.data),
job.table,
num_retries=3,
timeout=job.timeout,
job_config=bigquery.LoadJobConfig(**job.config),
).result()
except Exception as exc:
job.attempt += 1
if job.attempt > 3:
# TODO: add a metric for this + a DLQ & wrap exception type
self.error_notifier.send((exc, self.serialize_exception(exc)))
raise
else:
self.queue.put(job)
self.log_notifier.send(self.serialize_exception(exc))
else:
self.job_notifier.send(True)
self.log_notifier.send(
f"[{self.ext_id}] Loaded {len(job.data)} bytes into {job.table}."
)
finally:
self.queue.task_done() # type: ignore
class BatchJobThreadWorker(BatchJobWorker, _Thread):
pass
class BatchJobProcessWorker(BatchJobWorker, Process):
pass
class BigQueryBatchJobSink(BaseBigQuerySink):
MAX_WORKERS = (os.cpu_count() or 1) * 2
WORKER_CAPACITY_FACTOR = 1
WORKER_CREATION_MIN_INTERVAL = 10.0
@property
def job_config(self) -> Dict[str, Any]:
return {
"schema": self.table.get_resolved_schema(),
"source_format": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
"write_disposition": bigquery.WriteDisposition.WRITE_APPEND,
}
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.buffer = Compressor()
@staticmethod
def worker_cls_factory(
worker_executor_cls: Type[Process], config: Dict[str, Any]
) -> Type[Union[BatchJobThreadWorker, BatchJobProcessWorker]]:
Worker = type("Worker", (BatchJobWorker, worker_executor_cls), {})
return cast(Type[BatchJobThreadWorker], Worker)
def process_record(self, record: Dict[str, Any], context: Dict[str, Any]) -> None:
self.buffer.write(orjson.dumps(record, option=orjson.OPT_APPEND_NEWLINE))
def process_batch(self, context: Dict[str, Any]) -> None:
self.buffer.close()
self.global_queue.put(
Job(
data=(
self.buffer.getvalue()
if self.global_par_typ is ParType.PROCESS
else self.buffer.getbuffer()
),
table=self.table.as_ref(),
config=self.job_config,
),
)
self.increment_jobs_enqueued()
self.buffer = Compressor()
class BigQueryBatchJobDenormalizedSink(Denormalized, BigQueryBatchJobSink):
@property
def job_config(self) -> Dict[str, Any]:
return {
"schema": self.table.get_resolved_schema(),
"source_format": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
"write_disposition": bigquery.WriteDisposition.WRITE_APPEND,
"schema_update_options": [
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
],
"ignore_unknown_values": True,
}
# Defer schema evolution the the write disposition
def evolve_schema(self: BaseBigQuerySink) -> None:
pass