Skip to content

Commit

Permalink
Merge pull request #332 from canbekley/feat/apply-incremental-schema-…
Browse files Browse the repository at this point in the history
…changes

Feat/apply incremental schema changes
  • Loading branch information
BentsiLeviav authored Aug 21, 2024
2 parents 1ad813c + 05e43f0 commit a9042ad
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 77 deletions.
33 changes: 31 additions & 2 deletions dbt/adapters/clickhouse/column.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from dataclasses import dataclass
from typing import Any, TypeVar
from dataclasses import dataclass, field
from typing import Any, List, Literal, TypeVar

from dbt.adapters.base.column import Column
from dbt_common.exceptions import DbtRuntimeError
Expand Down Expand Up @@ -134,3 +134,32 @@ def _inner_dtype(self, dtype) -> str:
inner_dtype = null_match.group(1)

return inner_dtype


@dataclass(frozen=True)
class ClickHouseColumnChanges:
on_schema_change: Literal['ignore', 'fail', 'append_new_columns', 'sync_all_columns']
columns_to_add: List[Column] = field(default_factory=list)
columns_to_drop: List[Column] = field(default_factory=list)
columns_to_modify: List[Column] = field(default_factory=list)

def __bool__(self) -> bool:
return bool(self.columns_to_add or self.columns_to_drop or self.columns_to_modify)

@property
def has_schema_changes(self) -> bool:
return bool(self)

@property
def has_sync_changes(self) -> bool:
return bool(self.columns_to_drop or self.columns_to_modify)

@property
def has_conflicting_changes(self) -> bool:
if self.on_schema_change == 'fail' and self.has_schema_changes:
return True

if self.on_schema_change != 'sync_all_columns' and self.has_sync_changes:
return True

return False
18 changes: 9 additions & 9 deletions dbt/adapters/clickhouse/errors.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
schema_change_fail_error = """
The source and target schemas on this incremental model are out of sync.
They can be reconciled in several ways:
- set the `on_schema_change` config to `append_new_columns`. (ClickHouse does not support `sync_all_columns`)
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.
Additional troubleshooting context:
Source columns not in target: {0}
Target columns not in source: {1}
New column types: {2}
They can be reconciled in several ways:
- set the `on_schema_change` config to `append_new_columns` or `sync_all_columns`.
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.
Additional troubleshooting context:
Source columns not in target: {0}
Target columns not in source: {1}
New column types: {2}
"""

schema_change_datatype_error = """
Expand Down
46 changes: 26 additions & 20 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from dbt_common.utils import filter_null_values

from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache
from dbt.adapters.clickhouse.column import ClickHouseColumn
from dbt.adapters.clickhouse.column import ClickHouseColumn, ClickHouseColumnChanges
from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
from dbt.adapters.clickhouse.errors import (
schema_change_datatype_error,
Expand All @@ -39,7 +39,7 @@
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier
from dbt.adapters.clickhouse.relation import ClickHouseRelation, ClickHouseRelationType
from dbt.adapters.clickhouse.util import NewColumnDataType, compare_versions
from dbt.adapters.clickhouse.util import compare_versions

if TYPE_CHECKING:
import agate
Expand Down Expand Up @@ -193,35 +193,41 @@ def calculate_incremental_strategy(self, strategy: str) -> str:
@available.parse_none
def check_incremental_schema_changes(
self, on_schema_change, existing, target_sql
) -> List[ClickHouseColumn]:
if on_schema_change not in ('fail', 'ignore', 'append_new_columns'):
) -> ClickHouseColumnChanges:
if on_schema_change not in ('fail', 'ignore', 'append_new_columns', 'sync_all_columns'):
raise DbtRuntimeError(
"Only `fail`, `ignore`, and `append_new_columns` supported for `on_schema_change`"
"Only `fail`, `ignore`, `append_new_columns`, and `sync_all_columns` supported for `on_schema_change`."
)

source = self.get_columns_in_relation(existing)
source_map = {column.name: column for column in source}
target = self.get_column_schema_from_query(target_sql)
target_map = {column.name: column for column in source}
target_map = {column.name: column for column in target}

source_not_in_target = [column for column in source if column.name not in target_map.keys()]
target_not_in_source = [column for column in target if column.name not in source_map.keys()]
new_column_data_types = []
for target_column in target:
source_column = source_map.get(target_column.name)
if source_column and source_column.dtype != target_column.dtype:
new_column_data_types.append(
NewColumnDataType(source_column.name, target_column.dtype)
)
if new_column_data_types:
raise DbtRuntimeError(schema_change_datatype_error.format(new_column_data_types))
if source_not_in_target:
raise DbtRuntimeError(schema_change_missing_source_error.format(source_not_in_target))
if target_not_in_source and on_schema_change == 'fail':
target_in_source = [column for column in target if column.name in source_map.keys()]
changed_data_types = []
for column in target_in_source:
source_column = source_map.get(column.name)
if source_column is not None and column.dtype != source_column.dtype:
changed_data_types.append(column)

clickhouse_column_changes = ClickHouseColumnChanges(
columns_to_add=target_not_in_source,
columns_to_drop=source_not_in_target,
columns_to_modify=changed_data_types,
on_schema_change=on_schema_change,
)

if clickhouse_column_changes.has_conflicting_changes:
raise DbtRuntimeError(
schema_change_fail_error.format(
source_not_in_target, target_not_in_source, new_column_data_types
source_not_in_target, target_not_in_source, changed_data_types
)
)
return target_not_in_source

return clickhouse_column_changes

@available.parse_none
def s3source_clause(
Expand Down
6 changes: 0 additions & 6 deletions dbt/adapters/clickhouse/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,3 @@ def compare_versions(v1: str, v2: str) -> int:
except ValueError:
raise DbtRuntimeError("Version must consist of only numbers separated by '.'")
return 0


@dataclass
class NewColumnDataType:
column_name: str
new_type: str
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@
)
{% endmacro %}

{% macro create_empty_table_from_relation(relation, source_relation) -%}
{% macro create_empty_table_from_relation(relation, source_relation, sql=none) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set columns = adapter.get_columns_in_relation(source_relation) | list -%}


{%- if sql -%}
{%- set columns = adapter.get_column_schema_from_query(sql) | list -%}
{%- else -%}
{%- set columns = adapter.get_columns_in_relation(source_relation) | list -%}
{%- endif -%}
{%- set col_list = [] -%}
{% for col in columns %}
{{col_list.append(col.name + ' ' + col.data_type) or '' }}
Expand All @@ -123,7 +125,7 @@
{{ drop_relation_if_exists(shard_relation) }}
{{ drop_relation_if_exists(distributed_relation) }}
{{ create_schema(shard_relation) }}
{% do run_query(create_empty_table_from_relation(shard_relation, structure_relation)) or '' %}
{% do run_query(create_empty_table_from_relation(shard_relation, structure_relation, sql_query)) or '' %}
{% do run_query(create_distributed_table(distributed_relation, shard_relation)) or '' %}
{% if sql_query is not none %}
{% do run_query(clickhouse__insert_into(distributed_relation, sql_query)) or '' %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,18 @@
{% else %}
{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% if on_schema_change != 'ignore' %}
{%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%}
{% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %}
{% set incremental_strategy = 'legacy' %}
{% do log('Schema changes detected, switching to legacy incremental strategy') %}
{%- if on_schema_change != 'ignore' %}
{%- set local_column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation_local, sql) -%}
{% if local_column_changes and incremental_strategy != 'legacy' %}
{% do clickhouse__apply_column_changes(local_column_changes, existing_relation, True) %}
{% set existing_relation = load_cached_relation(this) %}
{% endif %}
{% endif %}
{% if incremental_strategy != 'delete_insert' and incremental_predicates %}
{% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %}
{% endif %}
{% if incremental_strategy == 'legacy' %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key, True) %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, local_column_changes, unique_key, True) %}
{% set need_swap = true %}
{% elif incremental_strategy == 'delete_insert' %}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{%- if on_schema_change != 'ignore' %}
{%- set column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation, sql) -%}
{%- if column_changes %}
{%- if incremental_strategy in ('append', 'delete_insert') %}
{% set incremental_strategy = 'legacy' %}
{{ log('Schema changes detected, switching to legacy incremental strategy') }}
{%- endif %}
{% if column_changes and incremental_strategy != 'legacy' %}
{% do clickhouse__apply_column_changes(column_changes, existing_relation) %}
{% set existing_relation = load_cached_relation(this) %}
{% endif %}
{% endif %}
{% if incremental_strategy != 'delete_insert' and incremental_predicates %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{% macro clickhouse__apply_column_changes(column_changes, existing_relation, is_distributed=False) %}
{{ log('Schema changes detected. Trying to apply the following changes: ' ~ column_changes) }}
{%- set existing_local = none -%}
{% if is_distributed %}
{%- set local_suffix = adapter.get_clickhouse_local_suffix() -%}
{%- set local_db_prefix = adapter.get_clickhouse_local_db_prefix() -%}
{%- set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none -%}
{% endif %}

{% if column_changes.on_schema_change == 'append_new_columns' %}
{% do clickhouse__add_columns(column_changes.columns_to_add, existing_relation, existing_local, is_distributed) %}

{% elif column_changes.on_schema_change == 'sync_all_columns' %}
{% do clickhouse__drop_columns(column_changes.columns_to_drop, existing_relation, existing_local, is_distributed) %}
{% do clickhouse__add_columns(column_changes.columns_to_add, existing_relation, existing_local, is_distributed) %}
{% do clickhouse__modify_columns(column_changes.columns_to_modify, existing_relation, existing_local, is_distributed) %}
{% endif %}

{% endmacro %}

{% macro clickhouse__add_columns(columns, existing_relation, existing_local=none, is_distributed=False) %}
{% for column in columns %}
{% set alter_action -%}
add column if not exists `{{ column.name }}` {{ column.data_type }}
{%- endset %}
{% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %}
{% endfor %}

{% endmacro %}

{% macro clickhouse__drop_columns(columns, existing_relation, existing_local=none, is_distributed=False) %}
{% for column in columns %}
{% set alter_action -%}
drop column if exists `{{ column.name }}`
{%- endset %}
{% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %}
{% endfor %}

{% endmacro %}

{% macro clickhouse__modify_columns(columns, existing_relation, existing_local=none, is_distributed=False) %}
{% for column in columns %}
{% set alter_action -%}
modify column if exists `{{ column.name }}` {{ column.data_type }}
{%- endset %}
{% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %}
{% endfor %}

{% endmacro %}

{% macro clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local=none, is_distributed=False) %}
{% if is_distributed %}
{% call statement('alter_table') %}
alter table {{ existing_local }} {{ on_cluster_clause(existing_relation) }} {{ alter_action }}
{% endcall %}
{% call statement('alter_table') %}
alter table {{ existing_relation }} {{ on_cluster_clause(existing_relation) }} {{ alter_action }}
{% endcall %}

{% else %}
{% call statement('alter_table') %}
alter table {{ existing_relation }} {{ alter_action }}
{% endcall %}
{% endif %}

{% endmacro %}
Loading

0 comments on commit a9042ad

Please sign in to comment.