Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve the case of schemas and databases when listing relations (#2403) #2411

Merged
merged 2 commits into from
May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
## dbt 0.17.0 (Release TBD)

### Breaking changes
- The `list_relations_without_caching`, `drop_schema`, and `create_schema` macros and methods now accept a single argument of a Relation object with no identifier field. ([#2411](https://github.com/fishtown-analytics/dbt/pull/2411))

### Features
- Added warning to nodes selector if nothing was matched ([#2115](https://github.com/fishtown-analytics/dbt/issues/2115), [#2343](https://github.com/fishtown-analytics/dbt/pull/2343))
- Suport column descriptions for BigQuery models ([#2335](https://github.com/fishtown-analytics/dbt/issues/2335), [#2402](https://github.com/fishtown-analytics/dbt/pull/2402))


### Fixes
- When tracking is disabled due to errors, do not reset the invocation ID ([#2398](https://github.com/fishtown-analytics/dbt/issues/2398), [#2400](https://github.com/fishtown-analytics/dbt/pull/2400))
- Fix for logic error in compilation errors for duplicate data test names ([#2406](https://github.com/fishtown-analytics/dbt/issues/2406), [#2407](https://github.com/fishtown-analytics/dbt/pull/2407))
- Fix list_schemas macro failing for BigQuery ([#2412](https://github.com/fishtown-analytics/dbt/issues/2412), [#2413](https://github.com/fishtown-analytics/dbt/issues/2413))
- Fix for making schema tests work for community plugin [dbt-sqlserver](https://github.com/mikaelene/dbt-sqlserver) [#2414](https://github.com/fishtown-analytics/dbt/pull/2414)
- Fix a bug where quoted uppercase schemas on snowflake were not processed properly during cache building. ([#2403](https://github.com/fishtown-analytics/dbt/issues/2403), [#2411](https://github.com/fishtown-analytics/dbt/pull/2411))

Contributors:
- [@azhard](https://github.com/azhard) [#2413](https://github.com/fishtown-analytics/dbt/pull/2413)
- [@mikaelene](https://github.com/mikaelene) [#2414](https://github.com/fishtown-analytics/dbt/pull/2414)

Contributors:
- [@raalsky](https://github.com/Raalsky) ([#2343](https://github.com/fishtown-analytics/dbt/pull/2343))

## dbt 0.17.0b1 (May 5, 2020)
Expand Down
58 changes: 36 additions & 22 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,18 @@ def _schema_is_cached(self, database: str, schema: str) -> bool:
else:
return True

def _get_cache_schemas(
self, manifest: Manifest, exec_only: bool = False
) -> SchemaSearchMap:
def _get_cache_schemas(self, manifest: Manifest) -> Set[BaseRelation]:
"""Get the set of schema relations that the cache logic needs to
populate. This means only executable nodes are included.
"""
# the cache only cares about executable nodes
return {
self.Relation.create_from(self.config, node).without_identifier()
for node in manifest.nodes.values()
if node.resource_type in NodeType.executable()
}

def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
"""Get a mapping of each node's "information_schema" relations to a
set of all schemas expected in that information_schema.

Expand All @@ -295,8 +304,6 @@ def _get_cache_schemas(
manifest.sources.values(),
)
for node in nodes:
if exec_only and node.resource_type not in NodeType.executable():
continue
relation = self.Relation.create_from(self.config, node)
info_schema_name_map.add(relation)
# result is a map whose keys are information_schema Relations without
Expand All @@ -306,10 +313,11 @@ def _get_cache_schemas(
return info_schema_name_map

def _list_relations_get_connection(
self, db: BaseRelation, schema: str
self, schema_relation: BaseRelation
) -> List[BaseRelation]:
with self.connection_named(f'list_{db.database}_{schema}'):
return self.list_relations_without_caching(db, schema)
name = f'list_{schema_relation.database}_{schema_relation.schema}'
with self.connection_named(name):
return self.list_relations_without_caching(schema_relation)

def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
"""Populate the relations cache for the given schemas. Returns an
Expand All @@ -318,11 +326,11 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
if not dbt.flags.USE_CACHE:
return

schema_map = self._get_cache_schemas(manifest, exec_only=True)
cache_schemas = self._get_cache_schemas(manifest)
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = [
tpe.submit(self._list_relations_get_connection, db, schema)
for db, schema in schema_map.search()
tpe.submit(self._list_relations_get_connection, cache_schema)
for cache_schema in cache_schemas
]
for future in as_completed(futures):
# if we can't read the relations we need to just raise anyway,
Expand All @@ -333,7 +341,14 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
# it's possible that there were no relations in some schemas. We want
# to insert the schemas we query into the cache's `.schemas` attribute
# so we can check it later
self.cache.update_schemas(schema_map.schemas_searched())
cache_update: Set[Tuple[str, Optional[str]]] = set()
for relation in cache_schemas:
if relation.database is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to mention here that some plugins (namely Spark) use database as an alias for schema, and only have the one level of hierarchy. I think it's ok to add this logic here -- we can override it in a plugin if we need to -- but the idea that all relations will have a database and schema is not quite so invariant from dbt Core's perspective as this code indicates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark will already need to be updated, as it overrides _get_cache_schemas and list_relations_without_caching.

I think this change will actually make it easier for spark to handle it: it sets database = schema but the include policy for relations is set to database=False - there's probably a couple rough edges to clear up, but my goal is that spark's default include policy + behavior will play pretty nicely with this and we can delete the create/drop schema implementations, at least.

raise InternalException(
'Got a None database in a cached schema!'
)
cache_update.add((relation.database, relation.schema))
self.cache.update_schemas(cache_update)

def set_relations_cache(
self, manifest: Manifest, clear: bool = False
Expand Down Expand Up @@ -512,15 +527,14 @@ def expand_column_types(

@abc.abstractmethod
def list_relations_without_caching(
self, information_schema: BaseRelation, schema: str
self, schema_relation: BaseRelation
) -> List[BaseRelation]:
"""List relations in the given schema, bypassing the cache.

This is used as the underlying behavior to fill the cache.

:param Relation information_schema: The information schema to list
relations from.
:param str schema: The name of the schema to list relations from.
:param schema_relation: A relation containing the database and schema
as appropraite for the underlying data warehouse
:return: The relations in schema
:rtype: List[self.Relation]
"""
Expand Down Expand Up @@ -636,17 +650,17 @@ def list_relations(self, database: str, schema: str) -> List[BaseRelation]:
if self._schema_is_cached(database, schema):
return self.cache.get_relations(database, schema)

information_schema = self.Relation.create(
schema_relation = self.Relation.create(
database=database,
schema=schema,
identifier='',
quote_policy=self.config.quoting
).information_schema()
).without_identifier()

# we can't build the relations cache because we don't have a
# manifest so we can't run any operations.
relations = self.list_relations_without_caching(
information_schema, schema
schema_relation
)

logger.debug('with database={}, schema={}, relations={}'
Expand Down Expand Up @@ -727,15 +741,15 @@ def already_exists(self, schema: str, name: str) -> bool:
###
@abc.abstractmethod
@available.parse_none
def create_schema(self, database: str, schema: str):
def create_schema(self, relation: BaseRelation):
"""Create the given schema if it does not exist."""
raise NotImplementedException(
'`create_schema` is not implemented for this adapter!'
)

@abc.abstractmethod
@available.parse_none
def drop_schema(self, database: str, schema: str):
def drop_schema(self, relation: BaseRelation):
"""Drop the given schema (and everything in it) if it exists."""
raise NotImplementedException(
'`drop_schema` is not implemented for this adapter!'
Expand Down Expand Up @@ -1014,7 +1028,7 @@ def _get_one_catalog(
def get_catalog(
self, manifest: Manifest
) -> Tuple[agate.Table, List[Exception]]:
schema_map = self._get_cache_schemas(manifest)
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = [
Expand Down
37 changes: 17 additions & 20 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ def information_schema(self, view_name=None) -> 'InformationSchema':
def information_schema_only(self) -> 'InformationSchema':
return self.information_schema()

def without_identifier(self) -> 'BaseRelation':
"""Return a form of this relation that only has the database and schema
set to included. To get the appropriately-quoted form the schema out of
the result (for use as part of a query), use `.render()`. To get the
raw database or schema name, use `.database` or `.schema`.

The hash of the returned object is the result of render().
"""
return self.include(identifier=False).replace_path(identifier=None)

def _render_iterator(
self
) -> Iterator[Tuple[Optional[ComponentName], Optional[str]]]:
Expand Down Expand Up @@ -501,38 +511,25 @@ def _render_iterator(self):

class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]):
"""A utility class to keep track of what information_schema tables to
search for what schemas
search for what schemas. The schema values are all lowercased to avoid
duplication.
"""
def add(self, relation: BaseRelation, preserve_case=False):
def add(self, relation: BaseRelation):
key = relation.information_schema_only()
if key not in self:
self[key] = set()
schema: Optional[str] = None
if relation.schema is not None:
if preserve_case:
schema = relation.schema
else:
schema = relation.schema.lower()
schema = relation.schema.lower()
self[key].add(schema)

def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]:
def search(
self
) -> Iterator[Tuple[InformationSchema, Optional[str]]]:
for information_schema_name, schemas in self.items():
for schema in schemas:
yield information_schema_name, schema

def schemas_searched(self) -> Set[Tuple[str, Optional[str]]]:
result: Set[Tuple[str, Optional[str]]] = set()
for information_schema_name, schemas in self.items():
if information_schema_name.database is None:
raise InternalException(
'Got a None database in an information schema!'
)
result.update(
(information_schema_name.database, schema)
for schema in schemas
)
return result

def flatten(self):
new = self.__class__()

Expand Down
22 changes: 11 additions & 11 deletions core/dbt/adapters/sql/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,31 +174,31 @@ def get_columns_in_relation(self, relation):
kwargs={'relation': relation}
)

def create_schema(self, database: str, schema: str) -> None:
logger.debug('Creating schema "{}"."{}".', database, schema)
def create_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier()
logger.debug('Creating schema "{}"', relation)
kwargs = {
'database_name': self.quote_as_configured(database, 'database'),
'schema_name': self.quote_as_configured(schema, 'schema'),
'relation': relation,
}
self.execute_macro(CREATE_SCHEMA_MACRO_NAME, kwargs=kwargs)
self.commit_if_has_connection()
# we can't update the cache here, as if the schema already existed we
# don't want to (incorrectly) say that it's empty

def drop_schema(self, database: str, schema: str) -> None:
logger.debug('Dropping schema "{}"."{}".', database, schema)
def drop_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier()
logger.debug('Dropping schema "{}".', relation)
kwargs = {
'database_name': self.quote_as_configured(database, 'database'),
'schema_name': self.quote_as_configured(schema, 'schema'),
'relation': relation,
}
self.execute_macro(DROP_SCHEMA_MACRO_NAME, kwargs=kwargs)
# we can update the cache here
self.cache.drop_schema(database, schema)
self.cache.drop_schema(relation.database, relation.schema)

def list_relations_without_caching(
self, information_schema, schema
self, schema_relation: BaseRelation,
) -> List[BaseRelation]:
kwargs = {'information_schema': information_schema, 'schema': schema}
kwargs = {'schema_relation': schema_relation}
results = self.execute_macro(
LIST_RELATIONS_MACRO_NAME,
kwargs=kwargs
Expand Down
22 changes: 11 additions & 11 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@
{{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }}
{% endmacro %}

{% macro create_schema(database_name, schema_name) -%}
{{ adapter_macro('create_schema', database_name, schema_name) }}
{% macro create_schema(relation) -%}
{{ adapter_macro('create_schema', relation) }}
{% endmacro %}

{% macro default__create_schema(database_name, schema_name) -%}
{% macro default__create_schema(relation) -%}
{%- call statement('create_schema') -%}
create schema if not exists {{database_name}}.{{schema_name}}
create schema if not exists {{ relation.without_identifier() }}
{% endcall %}
{% endmacro %}

{% macro drop_schema(database_name, schema_name) -%}
{{ adapter_macro('drop_schema', database_name, schema_name) }}
{% macro drop_schema(relation) -%}
{{ adapter_macro('drop_schema', relation) }}
{% endmacro %}

{% macro default__drop_schema(database_name, schema_name) -%}
{% macro default__drop_schema(relation) -%}
{%- call statement('drop_schema') -%}
drop schema if exists {{database_name}}.{{schema_name}} cascade
drop schema if exists {{ relation.without_identifier() }} cascade
{% endcall %}
{% endmacro %}

Expand Down Expand Up @@ -262,12 +262,12 @@
{% endmacro %}


{% macro list_relations_without_caching(information_schema, schema) %}
{{ return(adapter_macro('list_relations_without_caching', information_schema, schema)) }}
{% macro list_relations_without_caching(schema_relation) %}
{{ return(adapter_macro('list_relations_without_caching', schema_relation)) }}
{% endmacro %}


{% macro default__list_relations_without_caching(information_schema, schema) %}
{% macro default__list_relations_without_caching(schema_relation) %}
{{ exceptions.raise_not_implemented(
'list_relations_without_caching macro not implemented for adapter '+adapter.type()) }}
{% endmacro %}
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def make_unique_id_map(
sources: Dict[str, CatalogTable] = {}

node_map, source_map = get_unique_id_mapping(manifest)
table: CatalogTable
for table in self.values():
key = table.key()
if key in node_map:
Expand Down
Loading