-
Notifications
You must be signed in to change notification settings - Fork 3
/
dynamodb_connector.py
123 lines (108 loc) · 4.11 KB
/
dynamodb_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import genson
import orjson
from botocore.exceptions import ClientError
from singer_sdk.connectors import AWSBoto3Connector
from tap_dynamodb.exception import EmptyTableException
class DynamoDbConnector(AWSBoto3Connector):
"""DynamoDB connector class."""
def __init__(
self,
config: dict,
) -> None:
"""Initialize the connector.
Args:
config: The connector configuration.
"""
super().__init__(config, "dynamodb")
@staticmethod
def _coerce_types(record):
return orjson.loads(
orjson.dumps(
record,
default=lambda o: str(o),
option=orjson.OPT_OMIT_MICROSECONDS,
).decode("utf-8")
)
def _recursively_drop_required(self, schema: dict) -> None:
"""Recursively drop the required property from a schema.
This is used to clean up genson generated schemas which are strict by default.
Args:
schema: The json schema.
"""
schema.pop("required", None)
if "properties" in schema:
for prop in schema["properties"]:
if schema["properties"][prop].get("type") == "object":
self._recursively_drop_required(schema["properties"][prop])
def list_tables(self, include=None):
try:
tables = []
for table in self.resource.tables.all():
if include is None or table.name in include:
tables.append(table.name)
except ClientError as err:
self.logger.error(
"Couldn't list tables. Here's why: %s: %s",
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
else:
return tables
def get_items_iter(
self, table_name: str, scan_kwargs: dict = {"ConsistentRead": True}
):
table = self.resource.Table(table_name)
try:
done = False
start_key = None
while not done:
if start_key:
scan_kwargs["ExclusiveStartKey"] = start_key
response = table.scan(**scan_kwargs)
yield [
self._coerce_types(record) for record in response.get("Items", [])
]
start_key = response.get("LastEvaluatedKey", None)
done = start_key is None
except ClientError as err:
self.logger.error(
"Couldn't scan for %s. Here's why: %s: %s",
table_name,
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
def _get_sample_records(self, table_name: str, sample_size: int) -> list:
sample_records = []
for batch in self.get_items_iter(
table_name, scan_kwargs={"Limit": sample_size, "ConsistentRead": True}
):
sample_records.extend(batch)
if len(sample_records) >= sample_size:
break
return sample_records
def get_table_json_schema(
self, table_name: str, sample_size, strategy: str = "infer"
) -> dict:
sample_records = self._get_sample_records(table_name, sample_size)
if not sample_records:
raise EmptyTableException()
if strategy == "infer":
builder = genson.SchemaBuilder(schema_uri=None)
for record in sample_records:
builder.add_object(self._coerce_types(record))
schema = builder.to_schema()
self._recursively_drop_required(schema)
if not schema:
raise Exception("Inferring schema failed")
else:
self.logger.info(
f"Inferring schema successful for table: '{table_name}'"
)
else:
raise Exception(f"Strategy {strategy} not supported")
return schema
def get_table_key_properties(self, table_name):
key_schema = self.resource.Table(table_name).key_schema
return [key.get("AttributeName") for key in key_schema]