-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
jinja.py
206 lines (165 loc) · 6.37 KB
/
jinja.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import re
import threading
from contextlib import contextmanager
from typing import Any, Dict, List, NoReturn, Optional, Tuple, Union
import jinja2
import jinja2.ext
import jinja2.nativetypes # type: ignore
import jinja2.nodes
import jinja2.parser
import jinja2.sandbox
from dbt.contracts.graph.nodes import GenericTestNode
from dbt.exceptions import (
DbtInternalError,
MaterializtionMacroNotUsedError,
NoSupportedLanguagesFoundError,
)
from dbt.node_types import ModelLanguage
from dbt_common.clients.jinja import (
CallableMacroGenerator,
MacroProtocol,
get_template,
render_template,
)
from dbt_common.utils import deep_map_render
SUPPORTED_LANG_ARG = jinja2.nodes.Name("supported_languages", "param")
class MacroStack(threading.local):
def __init__(self):
super().__init__()
self.call_stack = []
@property
def depth(self) -> int:
return len(self.call_stack)
def push(self, name):
self.call_stack.append(name)
def pop(self, name):
got = self.call_stack.pop()
if got != name:
raise DbtInternalError(f"popped {got}, expected {name}")
class MacroGenerator(CallableMacroGenerator):
def __init__(
self,
macro: MacroProtocol,
context: Optional[Dict[str, Any]] = None,
node: Optional[Any] = None,
stack: Optional[MacroStack] = None,
) -> None:
super().__init__(macro, context)
self.node = node
self.stack = stack
# This adds the macro's unique id to the node's 'depends_on'
@contextmanager
def track_call(self):
# This is only called from __call__
if self.stack is None:
yield
else:
unique_id = self.macro.unique_id
depth = self.stack.depth
# only mark depth=0 as a dependency, when creating this dependency we don't pass in stack
if depth == 0 and self.node:
self.node.depends_on.add_macro(unique_id)
self.stack.push(unique_id)
try:
yield
finally:
self.stack.pop(unique_id)
# this makes MacroGenerator objects callable like functions
def __call__(self, *args, **kwargs):
with self.track_call():
return self.call_macro(*args, **kwargs)
class UnitTestMacroGenerator(MacroGenerator):
# this makes UnitTestMacroGenerator objects callable like functions
def __init__(
self,
macro_generator: MacroGenerator,
call_return_value: Any,
) -> None:
super().__init__(
macro_generator.macro,
macro_generator.context,
macro_generator.node,
macro_generator.stack,
)
self.call_return_value = call_return_value
def __call__(self, *args, **kwargs):
with self.track_call():
return self.call_return_value
# performance note: Local benmcharking (so take it with a big grain of salt!)
# on this indicates that it is is on average slightly slower than
# checking two separate patterns, but the standard deviation is smaller with
# one pattern. The time difference between the two was ~2 std deviations, which
# is small enough that I've just chosen the more readable option.
_HAS_RENDER_CHARS_PAT = re.compile(r"({[{%#]|[#}%]})")
_render_cache: Dict[str, Any] = dict()
def get_rendered(
string: str,
ctx: Dict[str, Any],
node=None,
capture_macros: bool = False,
native: bool = False,
) -> Any:
# performance optimization: if there are no jinja control characters in the
# string, we can just return the input. Fall back to jinja if the type is
# not a string or if native rendering is enabled (so '1' -> 1, etc...)
# If this is desirable in the native env as well, we could handle the
# native=True case by passing the input string to ast.literal_eval, like
# the native renderer does.
has_render_chars = not isinstance(string, str) or _HAS_RENDER_CHARS_PAT.search(string)
if not has_render_chars:
if not native:
return string
elif string in _render_cache:
return _render_cache[string]
template = get_template(
string,
ctx,
node,
capture_macros=capture_macros,
native=native,
)
rendered = render_template(template, ctx, node)
if not has_render_chars and native:
_render_cache[string] = rendered
return rendered
def undefined_error(msg) -> NoReturn:
raise jinja2.exceptions.UndefinedError(msg)
GENERIC_TEST_KWARGS_NAME = "_dbt_generic_test_kwargs"
def add_rendered_test_kwargs(
context: Dict[str, Any],
node: GenericTestNode,
capture_macros: bool = False,
) -> None:
"""Render each of the test kwargs in the given context using the native
renderer, then insert that value into the given context as the special test
keyword arguments member.
"""
looks_like_func = r"^\s*(env_var|ref|var|source|doc)\s*\(.+\)\s*$"
def _convert_function(value: Any, keypath: Tuple[Union[str, int], ...]) -> Any:
if isinstance(value, str):
if keypath == ("column_name",):
# special case: Don't render column names as native, make them
# be strings
return value
if re.match(looks_like_func, value) is not None:
# curly braces to make rendering happy
value = f"{{{{ {value} }}}}"
value = get_rendered(value, context, node, capture_macros=capture_macros, native=True)
return value
# The test_metadata.kwargs come from the test builder, and were set
# when the test node was created in _parse_generic_test.
kwargs = deep_map_render(_convert_function, node.test_metadata.kwargs)
context[GENERIC_TEST_KWARGS_NAME] = kwargs
def get_supported_languages(node: jinja2.nodes.Macro) -> List[ModelLanguage]:
if "materialization" not in node.name:
raise MaterializtionMacroNotUsedError(node=node)
no_kwargs = not node.defaults
no_langs_found = SUPPORTED_LANG_ARG not in node.args
if no_kwargs or no_langs_found:
raise NoSupportedLanguagesFoundError(node=node)
lang_idx = node.args.index(SUPPORTED_LANG_ARG)
# indexing defaults from the end
# since supported_languages is a kwarg, and kwargs are at always after args
return [
ModelLanguage[item.value] for item in node.defaults[-(len(node.args) - lang_idx)].items
]