-
Notifications
You must be signed in to change notification settings - Fork 118
/
Copy pathdata_validation.py
387 lines (338 loc) · 15.5 KB
/
data_validation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import warnings
from concurrent.futures import ThreadPoolExecutor
import ibis.backends.pandas
import numpy
import pandas
from data_validation import combiner, consts, metadata
from data_validation.config_manager import ConfigManager
from data_validation.query_builder.random_row_builder import RandomRowBuilder
from data_validation.schema_validation import SchemaValidation
from data_validation.validation_builder import ValidationBuilder
""" The DataValidation class is where the code becomes source/target aware
The class builds specific source and target clients and is likely where someone would go to
customize their validation process.
data_validator = DataValidation(builder, source_config, target_config, result_handler=None, verbose=False)
"""
class DataValidation(object):
def __init__(
self,
config,
validation_builder=None,
schema_validator=None,
result_handler=None,
verbose=False,
):
"""Initialize a DataValidation client
Args:
config (dict): The validation config used for the comparison.
validation_builder (ValidationBuilder): Optional instance of a ValidationBuilder.
schema_validator (SchemaValidation): Optional instance of a SchemaValidation.
result_handler (ResultHandler): Optional instance of as ResultHandler client.
verbose (bool): If verbose, the Data Validation client will print the queries run.
"""
self.verbose = verbose
# Data Client Management
self.config = config
self.config_manager = ConfigManager(config, verbose=self.verbose)
self.run_metadata = metadata.RunMetadata()
self.run_metadata.labels = self.config_manager.labels
# Initialize Validation Builder if None was supplied
self.validation_builder = validation_builder or ValidationBuilder(
self.config_manager
)
self.schema_validator = schema_validator or SchemaValidation(
self.config_manager, run_metadata=self.run_metadata, verbose=self.verbose
)
# Initialize the default Result Handler if None was supplied
self.result_handler = result_handler or self.config_manager.get_result_handler()
# TODO(dhercher) we planned on shifting this to use an Execution Handler.
# Leaving to to swast on the design of how this should look.
def execute(self):
"""Execute Queries and Store Results"""
# Apply random row filter before validations run
if self.config_manager.use_random_rows():
self._add_random_row_filter()
# Run correct execution for the given validation type
if self.config_manager.validation_type == consts.ROW_VALIDATION:
grouped_fields = self.validation_builder.pop_grouped_fields()
result_df = self.execute_recursive_validation(
self.validation_builder, grouped_fields
)
elif self.config_manager.validation_type == consts.SCHEMA_VALIDATION:
"""Perform only schema validation"""
result_df = self.schema_validator.execute()
else:
result_df = self._execute_validation(
self.validation_builder, process_in_memory=True
)
# Call Result Handler to Manage Results
return self.result_handler.execute(result_df)
def _add_random_row_filter(self):
"""Add random row filters to the validation builder."""
if not self.config_manager.primary_keys:
raise ValueError("Primary Keys are required for Random Row Filters")
# Filter for only first primary key (multi-pk filter not supported)
primary_key_info = self.config_manager.primary_keys[0]
query = RandomRowBuilder(
[primary_key_info[consts.CONFIG_SOURCE_COLUMN]],
self.config_manager.random_row_batch_size(),
).compile(
self.config_manager.source_client,
self.config_manager.source_schema,
self.config_manager.source_table,
self.validation_builder.source_builder,
)
random_rows = self.config_manager.source_client.execute(query)
if len(random_rows) == 0:
return
filter_field = {
consts.CONFIG_TYPE: consts.FILTER_TYPE_ISIN,
consts.CONFIG_FILTER_SOURCE_COLUMN: primary_key_info[
consts.CONFIG_SOURCE_COLUMN
],
consts.CONFIG_FILTER_SOURCE_VALUE: random_rows[
primary_key_info[consts.CONFIG_SOURCE_COLUMN]
],
consts.CONFIG_FILTER_TARGET_COLUMN: primary_key_info[
consts.CONFIG_TARGET_COLUMN
],
consts.CONFIG_FILTER_TARGET_VALUE: random_rows[
primary_key_info[consts.CONFIG_SOURCE_COLUMN]
],
}
self.validation_builder.add_filter(filter_field)
def query_too_large(self, rows_df, grouped_fields):
"""Return bool to dictate if another level of recursion
would create a too large result set.
Rules to define too large are:
- If any grouped fields remain, return False.
(assumes user added logical sized groups)
- Else, if next group size is larger
than the limit, return True.
- Finally return False if no covered case occured.
"""
if len(grouped_fields) > 1:
return False
try:
count_df = rows_df[
rows_df[consts.AGGREGATION_TYPE] == consts.CONFIG_TYPE_COUNT
]
for row in count_df.to_dict(orient="row"):
recursive_query_size = max(
float(row[consts.SOURCE_AGG_VALUE]),
float(row[consts.TARGET_AGG_VALUE]),
)
if recursive_query_size > self.config_manager.max_recursive_query_size:
logging.warning("Query result is too large for recursion: %s", row)
return True
except Exception:
logging.warning("Recursive values could not be cast to float.")
return False
return False
def execute_recursive_validation(self, validation_builder, grouped_fields):
"""Recursive execution for Row validations.
This method executes aggregate queries, such as sum-of-hashes, on the
source and target tables. Where they differ, add to the GROUP BY
clause recursively until the individual row differences can be
identified.
"""
process_in_memory = self.config_manager.process_in_memory()
past_results = []
if len(grouped_fields) > 0:
validation_builder.add_query_group(grouped_fields[0])
result_df = self._execute_validation(
validation_builder, process_in_memory=process_in_memory
)
for grouped_key in result_df[consts.GROUP_BY_COLUMNS].unique():
# Validations are viewed separtely, but queried together.
# We must treat them as a single item which failed or succeeded.
group_suceeded = True
grouped_key_df = result_df[
result_df[consts.GROUP_BY_COLUMNS] == grouped_key
]
if self.query_too_large(grouped_key_df, grouped_fields):
past_results.append(grouped_key_df)
continue
for row in grouped_key_df.to_dict(orient="row"):
if row[consts.SOURCE_AGG_VALUE] == row[consts.TARGET_AGG_VALUE]:
continue
else:
group_suceeded = False
break
if group_suceeded:
past_results.append(grouped_key_df)
else:
recursive_validation_builder = validation_builder.clone()
self._add_recursive_validation_filter(
recursive_validation_builder, row
)
past_results.append(
self.execute_recursive_validation(
recursive_validation_builder, grouped_fields[1:]
)
)
elif self.config_manager.primary_keys and len(grouped_fields) == 0:
past_results.append(
self._execute_validation(
validation_builder, process_in_memory=process_in_memory
)
)
# elif self.config_manager.primary_keys:
# validation_builder.add_config_query_groups(self.config_manager.primary_keys)
# validation_builder.add_config_query_groups(grouped_fields)
else:
warnings.warn(
"WARNING: No Primary Keys Suppplied in Row Validation", UserWarning
)
return None
return pandas.concat(past_results)
def _add_recursive_validation_filter(self, validation_builder, row):
"""Return ValidationBuilder Configured for Next Recursive Search"""
group_by_columns = json.loads(row[consts.GROUP_BY_COLUMNS])
for alias, value in group_by_columns.items():
filter_field = {
consts.CONFIG_TYPE: consts.FILTER_TYPE_EQUALS,
consts.CONFIG_FILTER_SOURCE_COLUMN: validation_builder.get_grouped_alias_source_column(
alias
),
consts.CONFIG_FILTER_SOURCE_VALUE: value,
consts.CONFIG_FILTER_TARGET_COLUMN: validation_builder.get_grouped_alias_target_column(
alias
),
consts.CONFIG_FILTER_TARGET_VALUE: value,
}
validation_builder.add_filter(filter_field)
@classmethod
def _get_pandas_schema(self, source_df, target_df, join_on_fields, verbose=False):
"""Return a pandas schema which aligns source and targe for joins."""
# TODO(dhercher): We are experiencing issues around datetime coming as sring and not matching
# currently the hack to cast it to string works, but is not ideal.
# We should look at both types, and if 1 is
# date-like than use pandas.to_datetime on the other.
for join_on_field in join_on_fields:
source_df[join_on_field] = source_df[join_on_field].astype(str)
target_df[join_on_field] = target_df[join_on_field].astype(str)
# Loop over index keys() instead of iteritems() because pandas is
# failing with datetime64[ns, UTC] data type on Python 3.9.
schema_data = []
schema_index = []
for key in source_df.dtypes.keys():
dtype = source_df.dtypes[key]
# The Ibis pandas backend fails with `KeyError: dtype('O')` if
# object dtypes are passed in.
if dtype in {numpy.dtype("O")}:
continue
schema_data.append(dtype)
schema_index.append(key)
pd_schema = pandas.Series(schema_data, index=schema_index)
if verbose:
logging.info("-- ** Pandas Schema ** --")
logging.info(pd_schema)
return pd_schema
def _execute_validation(self, validation_builder, process_in_memory=True):
"""Execute Against a Supplied Validation Builder"""
self.run_metadata.validations = validation_builder.get_metadata()
source_query = validation_builder.get_source_query()
target_query = validation_builder.get_target_query()
join_on_fields = (
set(validation_builder.get_primary_keys())
if (self.config_manager.validation_type == consts.ROW_VALIDATION)
or (
self.config_manager.validation_type == consts.CUSTOM_QUERY
and self.config_manager.custom_query_type == "row"
)
else set(validation_builder.get_group_aliases())
)
# If row validation from YAML, compare source and target agg values
is_value_comparison = (
self.config_manager.validation_type == consts.ROW_VALIDATION
or (
self.config_manager.validation_type == consts.CUSTOM_QUERY
and self.config_manager.custom_query_type == "row"
)
)
if process_in_memory:
futures = []
with ThreadPoolExecutor() as executor:
# Submit the two query network calls concurrently
futures.append(
executor.submit(
self.config_manager.source_client.execute, source_query
)
)
futures.append(
executor.submit(
self.config_manager.target_client.execute, target_query
)
)
source_df = futures[0].result()
target_df = futures[1].result()
pd_schema = self._get_pandas_schema(
source_df, target_df, join_on_fields, verbose=self.verbose
)
pandas_client = ibis.backends.pandas.connect(
{combiner.DEFAULT_SOURCE: source_df, combiner.DEFAULT_TARGET: target_df}
)
try:
result_df = combiner.generate_report(
pandas_client,
self.run_metadata,
pandas_client.table(combiner.DEFAULT_SOURCE, schema=pd_schema),
pandas_client.table(combiner.DEFAULT_TARGET, schema=pd_schema),
join_on_fields=join_on_fields,
is_value_comparison=is_value_comparison,
verbose=self.verbose,
)
except Exception as e:
if self.verbose:
logging.error("-- ** Logging Source DF ** --")
logging.error(source_df.dtypes)
logging.error(source_df)
logging.error("-- ** Logging Target DF ** --")
logging.error(target_df.dtypes)
logging.error(target_df)
raise e
else:
result_df = combiner.generate_report(
self.config_manager.source_client,
self.run_metadata,
source_query,
target_query,
join_on_fields=join_on_fields,
is_value_comparison=is_value_comparison,
verbose=self.verbose,
)
return result_df
def combine_data(self, source_df, target_df, join_on_fields):
"""TODO: Return List of Dictionaries"""
# Clean Data to Standardize
if join_on_fields:
df = source_df.merge(
target_df,
how="outer",
on=join_on_fields,
suffixes=(consts.INPUT_SUFFIX, consts.OUTPUT_SUFFIX),
)
else:
df = source_df.join(
target_df,
how="outer",
lsuffix=consts.INPUT_SUFFIX,
rsuffix=consts.OUTPUT_SUFFIX,
)
return df