Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema tests defined by macros #339

Closed
wants to merge 13 commits into from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
- Ignore commented-out schema tests ([#330](https://github.com/fishtown-analytics/dbt/pull/330), [#328](https://github.com/fishtown-analytics/dbt/issues/328))
- Fix run levels ([#343](https://github.com/fishtown-analytics/dbt/pull/343), [#340](https://github.com/fishtown-analytics/dbt/issues/340), [#338](https://github.com/fishtown-analytics/dbt/issues/338))
- Fix concurrency, open a unique transaction per model ([#345](https://github.com/fishtown-analytics/dbt/pull/345), [#336](https://github.com/fishtown-analytics/dbt/issues/336))
- Handle concurrent `DROP ... CASCADE`s in Redshift ([#349](https://github.com/fishtown-analytics/dbt/pull/349))
- Always release connections (use `try .. finally`) ([#354](https://github.com/fishtown-analytics/dbt/pull/354))

### Changes

Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
recursive-include dbt/include *.py *.sql *.yml
27 changes: 19 additions & 8 deletions dbt/adapters/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ def release_connection(cls, profile, name):
lock.acquire()

if to_release.get('state') == 'open':

if to_release.get('transaction_open') is True:
cls.rollback(to_release)

to_release['name'] = None
connections_available.append(to_release)
else:
Expand All @@ -366,17 +370,17 @@ def release_connection(cls, profile, name):
def cleanup_connections(cls):
global connections_in_use, connections_available

for name, connection in connections_in_use.items():
if connection.get('state') != 'closed':
logger.debug("Connection '{}' was left open."
.format(name))
else:
logger.debug("Connection '{}' was properly closed."
.format(name))

try:
lock.acquire()

for name, connection in connections_in_use.items():
if connection.get('state') != 'closed':
logger.debug("Connection '{}' was left open."
.format(name))
else:
logger.debug("Connection '{}' was properly closed."
.format(name))

# garbage collect, but don't close them in case someone
# still has a handle
connections_in_use = {}
Expand Down Expand Up @@ -525,3 +529,10 @@ def table_exists(cls, profile, schema, table, model_name=None):
tables = cls.query_for_existing(profile, schema, model_name)
exists = tables.get(table) is not None
return exists

@classmethod
def already_exists(cls, profile, schema, table, model_name=None):
"""
Alias for `table_exists`.
"""
return cls.table_exists(profile, schema, table, model_name)
33 changes: 32 additions & 1 deletion dbt/adapters/redshift.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import multiprocessing

from dbt.adapters.postgres import PostgresAdapter
from dbt.logger import GLOBAL_LOGGER as logger # noqa
from dbt.compat import basestring


drop_lock = multiprocessing.Lock()


class RedshiftAdapter(PostgresAdapter):
Expand Down Expand Up @@ -30,7 +36,7 @@ def sort_qualifier(cls, sort_type, sort):
.format(sort_type, valid_sort_types)
)

if type(sort) == str:
if isinstance(sort, basestring):
sort_keys = [sort]
else:
sort_keys = sort
Expand All @@ -42,3 +48,28 @@ def sort_qualifier(cls, sort_type, sort):
return "{sort_type} sortkey({keys_csv})".format(
sort_type=sort_type, keys_csv=keys_csv
)

@classmethod
def drop(cls, profile, relation, relation_type, model_name=None):
global drop_lock

to_return = None

try:
drop_lock.acquire()

connection = cls.get_connection(profile, model_name)

cls.commit(connection)
cls.begin(profile, connection.get('name'))

to_return = super(PostgresAdapter, cls).drop(
profile, relation, relation_type, model_name)

cls.commit(connection)
cls.begin(profile, connection.get('name'))

return to_return

finally:
drop_lock.release()
40 changes: 37 additions & 3 deletions dbt/clients/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,40 @@

import jinja2
import jinja2.sandbox
import jinja2.nodes
import jinja2.ext

from dbt.utils import NodeType


def create_macro_validation_extension(node):

class MacroContextCatcherExtension(jinja2.ext.Extension):
DisallowedFuncs = ('ref', 'var')

def onError(self, token):
error = "The context variable '{}' is not allowed in macros." \
.format(token.value)
dbt.exceptions.raise_compiler_error(node, error)

def filter_stream(self, stream):
while not stream.eos:
token = next(stream)
held = [token]

if token.test('name') and token.value in self.DisallowedFuncs:
next_token = next(stream)
held.append(next_token)
if next_token.test('lparen'):
self.onError(token)

for token in held:
yield token

return jinja2.sandbox.SandboxedEnvironment(
extensions=[MacroContextCatcherExtension])


def create_macro_capture_env(node):

class ParserMacroCapture(jinja2.Undefined):
Expand Down Expand Up @@ -48,13 +78,17 @@ def __call__(self, *args, **kwargs):
env = jinja2.sandbox.SandboxedEnvironment()


def get_template(string, ctx, node=None, capture_macros=False):
def get_template(string, ctx, node=None, capture_macros=False,
validate_macro=False):
try:
local_env = env

if capture_macros is True:
if capture_macros:
local_env = create_macro_capture_env(node)

elif validate_macro:
local_env = create_macro_validation_extension(node)

return local_env.from_string(dbt.compat.to_string(string), globals=ctx)

except (jinja2.exceptions.TemplateSyntaxError,
Expand All @@ -73,4 +107,4 @@ def render_template(template, ctx, node=None):

def get_rendered(string, ctx, node=None, capture_macros=False):
template = get_template(string, ctx, node, capture_macros)
return render_template(template, ctx, node=None)
return render_template(template, ctx, node)
Loading