Skip to content

Commit

Permalink
Version 1.7 updates (#109)
Browse files Browse the repository at this point in the history
* version bumps

* tests for limit added

* fix constraints and limit tests, add pytest -n

* catalog fix v1.7

---------

Co-authored-by: Marvin Froemming <[email protected]>
Co-authored-by: ilikutle <[email protected]>
  • Loading branch information
3 people authored Mar 15, 2024
1 parent ae7f776 commit cdd4927
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 135 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/exasol/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.6.0"
version = "1.7.0"
97 changes: 95 additions & 2 deletions dbt/adapters/exasol/impl.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""dbt-exasol Adapter implementation extending SQLAdapter"""
from __future__ import absolute_import

from typing import Dict, Optional
from typing import Dict, Optional, List, Set

from itertools import chain
import agate
from dbt.adapters.base.meta import available
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.contracts.graph.manifest import Manifest
from dbt.adapters.base.impl import GET_CATALOG_MACRO_NAME, ConstraintSupport, GET_CATALOG_RELATIONS_MACRO_NAME, _expect_row_value
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.sql import SQLAdapter
from dbt.exceptions import CompilationError
from dbt.utils import filter_null_values
Expand Down Expand Up @@ -34,6 +38,10 @@ class ExasolAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

_capabilities = CapabilityDict(
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
)

@classmethod
def date_function(cls):
return "current_timestamp()"
Expand Down Expand Up @@ -137,3 +145,88 @@ def check_and_quote_identifier(self, identifier, models_column_dict=None) -> str
return self.quote(identifier)
else:
return identifier

def _get_one_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
) -> agate.Table:

kwargs = {
"information_schema": information_schema,
"schemas": schemas
}
table = self.execute_macro(
GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
manifest=manifest,
)
# Use database from credentials if no other given
for node in chain(manifest.nodes.values(), manifest.sources.values()):
if not node.database or node.database == 'None':
node.database = self.config.credentials.database

results = self._catalog_filter_table(table, manifest)
return results

def _get_one_catalog_by_relations(
self,
information_schema: InformationSchema,
relations: List[BaseRelation],
manifest: Manifest,
) -> agate.Table:

kwargs = {
"information_schema": information_schema,
"relations": relations,
}
table = self.execute_macro(
GET_CATALOG_RELATIONS_MACRO_NAME,
kwargs=kwargs,
manifest=manifest,
)

# Use database from credentials if no other given
for node in chain(manifest.nodes.values(), manifest.sources.values()):
if not node.database or node.database == 'None':
node.database = self.config.credentials.database

results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def get_filtered_catalog(
self, manifest: Manifest, relations: Optional[Set[BaseRelation]] = None
):
catalogs: agate.Table
if (
relations is None
or len(relations) > 100
or not self.supports(Capability.SchemaMetadataByRelations)
):
# Do it the traditional way. We get the full catalog.
catalogs, exceptions = self.get_catalog(manifest)
else:
# Do it the new way. We try to save time by selecting information
# only for the exact set of relations we are interested in.
catalogs, exceptions = self.get_catalog_by_relations(manifest, relations)

if relations and catalogs:
relation_map = {
(
r.schema.casefold() if r.schema else None,
r.identifier.casefold() if r.identifier else None,
)
for r in relations
}

def in_map(row: agate.Row):
s = _expect_row_value("table_schema", row)
i = _expect_row_value("table_name", row)
s = s.casefold() if s is not None else None
i = i.casefold() if i is not None else None
return (s, i) in relation_map

catalogs = catalogs.where(in_map)

return catalogs, exceptions
164 changes: 115 additions & 49 deletions dbt/include/exasol/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,53 +1,119 @@
{% macro exasol__get_catalog(information_schemas, schemas) -%}
{% for schema in schemas %}
{{ log(schema)}}
{% endfor %}

{%- call statement('catalog', fetch_result=True) -%}

WITH tabs as (

select
'DB' as table_database,
ROOT_NAME as table_schema,
OBJECT_NAME as table_name,
ROOT_NAME as table_owner,
OBJECT_TYPE AS table_type,
OBJECT_COMMENT as table_comment
from sys.EXA_USER_OBJECTS
WHERE OBJECT_TYPE IN('TABLE', 'VIEW')

),

cols as (

select
'DB' as table_database,
column_schema as table_schema,
column_table as table_name,
column_name,
column_ordinal_position as column_index,
column_type,
column_comment
from sys.exa_user_columns
{% macro exasol__get_catalog(information_schema, schemas) -%}

{% set query %}
with tables as (
{{ exasol__get_catalog_tables_sql(information_schema) }}
),
columns as (
{{ exasol__get_catalog_columns_sql(information_schema) }}
)
{{ exasol__get_catalog_results_sql() }}
{{ exasol__get_catalog_schemas_where_clause_sql(schemas) }}
order by
tables.table_owner,
columns.table_name,
columns.column_index
{%- endset -%}

{{ return(run_query(query)) }}

{%- endmacro %}


{% macro exasol__get_catalog_relations(information_schema, relations) -%}

{% set query %}
with tables as (
{{ exasol__get_catalog_tables_sql(information_schema) }}
),
columns as (
{{ exasol__get_catalog_columns_sql(information_schema) }}
)
{{ exasol__get_catalog_results_sql() }}
{{ exasol__get_catalog_relations_where_clause_sql(relations) }}
order by
tables.table_owner,
columns.table_name,
columns.column_index
{%- endset -%}

{{ return(run_query(query)) }}

{%- endmacro %}

)

select tabs.table_owner as [table_owner],
tabs.table_type AS [table_type],
tabs.table_comment as [table_comment],
cols.table_database as [table_database],
cols.table_schema as [table_schema],
cols.table_name as [table_name],
cols.column_name as [column_name],
cols.column_index as [column_index],
cols.column_type as [column_type],
cols.column_comment as [column_comment]
from tabs
join cols on tabs.table_database = cols.table_database and tabs.table_schema = cols.table_schema and tabs.table_name = cols.table_name
order by column_index
{%- endcall -%}

{{ return(load_result('catalog').table) }}
{% macro exasol__get_catalog_tables_sql(information_schema) -%}
select
'DB' as table_database,
ROOT_NAME as table_schema,
OBJECT_NAME as table_name,
ROOT_NAME as table_owner,
OBJECT_TYPE AS table_type,
OBJECT_COMMENT as table_comment
from sys.EXA_USER_OBJECTS
WHERE OBJECT_TYPE IN('TABLE', 'VIEW')
{%- endmacro %}


{% macro exasol__get_catalog_columns_sql(information_schema) -%}
select
'DB' as table_database,
column_schema as table_schema,
column_table as table_name,
column_name,
column_ordinal_position as column_index,
column_type,
column_comment
from sys.exa_user_columns
{%- endmacro %}


{% macro exasol__get_catalog_results_sql() -%}
select
tables.table_owner as [table_owner],
tables.table_type AS [table_type],
tables.table_comment as [table_comment],
columns.table_database as [table_database],
columns.table_schema as [table_schema],
columns.table_name as [table_name],
columns.column_name as [column_name],
columns.column_index as [column_index],
case
when columns.column_type='TIMESTAMP(3)' then 'TIMESTAMP'
else columns.column_type
end as [column_type],
columns.column_comment as [column_comment]
from tables
join columns on tables.table_database = columns.table_database and tables.table_schema = columns.table_schema and tables.table_name = columns.table_name
{%- endmacro %}


{% macro exasol__get_catalog_schemas_where_clause_sql(schemas, schema_name) -%}
where ({%- for schema in schemas -%}
upper(tables.table_owner) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%})
{%- endmacro %}


{% macro exasol__get_catalog_relations_where_clause_sql(relations) -%}
where (
{%- for relation in relations -%}
{% if relation.schema and relation.identifier %}
(
upper(tables.table_owner) = upper('{{ relation.schema }}')
and upper(columns.table_name) = upper('{{ relation.identifier }}')
)
{% elif relation.schema %}
(
upper(tables.table_owner) = upper('{{ relation.schema }}')
)
{% else %}
{% do exceptions.raise_compiler_error(
'`get_catalog_relations` requires a list of relations, each with a schema'
) %}
{% endif %}

{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- endmacro %}
Loading

0 comments on commit cdd4927

Please sign in to comment.