-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
sql.py
107 lines (86 loc) · 3.41 KB
/
sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from abc import abstractmethod
from datetime import datetime
from typing import Generic, TypeVar
import traceback
import dbt.exceptions
from dbt.contracts.sql import (
RemoteCompileResult,
RemoteCompileResultMixin,
RemoteRunResult,
ResultTable,
)
from dbt.events.functions import fire_event
from dbt.events.types import SQLRunnerException
from dbt.task.compile import CompileRunner
SQLResult = TypeVar("SQLResult", bound=RemoteCompileResultMixin)
class GenericSqlRunner(CompileRunner, Generic[SQLResult]):
def __init__(self, config, adapter, node, node_index, num_nodes):
CompileRunner.__init__(self, config, adapter, node, node_index, num_nodes)
def handle_exception(self, e, ctx):
fire_event(SQLRunnerException(exc=str(e), exc_info=traceback.format_exc()))
if isinstance(e, dbt.exceptions.Exception):
if isinstance(e, dbt.exceptions.RuntimeException):
e.add_node(ctx.node)
return e
def before_execute(self):
pass
def after_execute(self, result):
pass
def compile(self, manifest):
compiler = self.adapter.get_compiler()
return compiler.compile_node(self.node, manifest, {}, write=False)
@abstractmethod
def execute(self, compiled_node, manifest) -> SQLResult:
pass
@abstractmethod
def from_run_result(self, result, start_time, timing_info) -> SQLResult:
pass
def error_result(self, node, error, start_time, timing_info):
raise error
def ephemeral_result(self, node, start_time, timing_info):
raise dbt.exceptions.NotImplementedException("cannot execute ephemeral nodes remotely!")
class SqlCompileRunner(GenericSqlRunner[RemoteCompileResult]):
def execute(self, compiled_node, manifest) -> RemoteCompileResult:
return RemoteCompileResult(
raw_code=compiled_node.raw_code,
compiled_code=compiled_node.compiled_code,
node=compiled_node,
timing=[], # this will get added later
logs=[],
generated_at=datetime.utcnow(),
)
def from_run_result(self, result, start_time, timing_info) -> RemoteCompileResult:
return RemoteCompileResult(
raw_code=result.raw_code,
compiled_code=result.compiled_code,
node=result.node,
timing=timing_info,
logs=[],
generated_at=datetime.utcnow(),
)
class SqlExecuteRunner(GenericSqlRunner[RemoteRunResult]):
def execute(self, compiled_node, manifest) -> RemoteRunResult:
_, execute_result = self.adapter.execute(compiled_node.compiled_code, fetch=True)
table = ResultTable(
column_names=list(execute_result.column_names),
rows=[list(row) for row in execute_result],
)
return RemoteRunResult(
raw_code=compiled_node.raw_code,
compiled_code=compiled_node.compiled_code,
node=compiled_node,
table=table,
timing=[],
logs=[],
generated_at=datetime.utcnow(),
)
def from_run_result(self, result, start_time, timing_info) -> RemoteRunResult:
return RemoteRunResult(
raw_code=result.raw_code,
compiled_code=result.compiled_code,
node=result.node,
table=result.table,
timing=timing_info,
logs=[],
generated_at=datetime.utcnow(),
)