Skip to content

Commit

Permalink
Synchronization planner (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
kokorin authored Jul 6, 2024
1 parent 0702e25 commit cf545fe
Show file tree
Hide file tree
Showing 13 changed files with 982 additions and 188 deletions.
30 changes: 0 additions & 30 deletions dbt_pumpkin/canon.py

This file was deleted.

21 changes: 21 additions & 0 deletions dbt_pumpkin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ def relocate(project_dir, profiles_dir, target, profile, select, exclude, dry_ru
pumpkin.relocate(dry_run=dry_run)


@cli.command
@P.project_dir
@P.profiles_dir
@P.target
@P.profile
@P.select
@P.exclude
@P.dry_run
@P.debug
def synchronize(project_dir, profiles_dir, target, profile, select, exclude, dry_run, debug):
"""
Synchronizes YAML definitions with actual tables in DB
"""
set_up_logging(debug)

project_params = ProjectParams(project_dir=project_dir, profiles_dir=profiles_dir, target=target, profile=profile)
resource_params = ResourceParams(select=select, exclude=exclude)
pumpkin = Pumpkin(project_params, resource_params)
pumpkin.synchronize(dry_run=dry_run)


def main():
cli()

Expand Down
24 changes: 19 additions & 5 deletions dbt_pumpkin/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,22 @@ def __str__(self):


@dataclass(frozen=True)
class Column:
class TableColumn:
name: str
data_type: str | None
description: str | None
dtype: str
data_type: str
is_numeric: bool
is_string: bool


@dataclass(frozen=True)
class Table:
resource_id: ResourceID
columns: list[Column]
columns: list[TableColumn]

def __post_init__(self):
if not self.columns:
raise PropertyRequiredError("columns", self.resource_id) # noqa: EM101

def __hash__(self):
return hash(self.resource_id)
Expand All @@ -62,6 +68,14 @@ def __str__(self):
return self.unique_id


@dataclass(frozen=True)
class ResourceColumn:
name: str
quote: bool
data_type: str | None
description: str | None


@dataclass(frozen=True)
class Resource:
unique_id: ResourceID
Expand All @@ -73,7 +87,7 @@ class Resource:
type: ResourceType
path: Path | None
yaml_path: Path | None
columns: list[Column]
columns: list[ResourceColumn]
config: ResourceConfig | None

def __post_init__(self):
Expand Down
8 changes: 4 additions & 4 deletions dbt_pumpkin/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ def __init__(self, name: str, path: Path):


class PropertyRequiredError(PumpkinError):
def __init__(self, property_name, unique_id):
msg = f"Property {property_name} is required for resource {unique_id}"
def __init__(self, property_name, details):
msg = f"Property {property_name} is required: {details}"
super().__init__(msg)


class PropertyNotAllowedError(PumpkinError):
def __init__(self, property_name, unique_id):
msg = f"Property {property_name} is not allowed for resource {unique_id}"
def __init__(self, property_name, details):
msg = f"Property {property_name} is not allowed: {details}"
super().__init__(msg)


Expand Down
31 changes: 3 additions & 28 deletions dbt_pumpkin/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from ruamel.yaml import YAML

from dbt_pumpkin.data import Column, Resource, ResourceConfig, ResourceID, ResourceType, Table
from dbt_pumpkin.data import Resource, ResourceColumn, ResourceConfig, ResourceID, ResourceType, Table, TableColumn
from dbt_pumpkin.dbt_compat import (
EventMsg,
Manifest,
Expand All @@ -36,7 +36,6 @@ def __init__(self, project_params: ProjectParams, resource_params: ResourceParam
self._resource_ids: dict[ResourceType, set[ResourceID]] = None
self._resources: list[Resource] = None
self._tables: list[Table] = None
self._adapter_name: str = None
self._yaml = YAML(typ="safe")

def _do_load_manifest(self) -> Manifest:
Expand Down Expand Up @@ -157,7 +156,7 @@ def _do_select_resources(self) -> list[Resource]:
path=path,
yaml_path=yaml_path,
columns=[
Column(name=c.name, data_type=c.data_type, description=c.description)
ResourceColumn(name=c.name, quote=c.quote, data_type=c.data_type, description=c.description)
for c in raw_resource.columns.values()
],
config=config,
Expand Down Expand Up @@ -269,7 +268,7 @@ def _do_lookup_tables(self) -> list[Table]:
def on_result(result: dict):
table = Table(
resource_id=ResourceID(result["resource_id"]),
columns=[Column(name=c["name"], data_type=c["data_type"], description=None) for c in result["columns"]],
columns=[TableColumn(**c) for c in result["columns"]],
)
tables.append(table)
logger.info("Looked up %s / %s: %s", len(tables), len(raw_resources), table.resource_id)
Expand All @@ -284,27 +283,3 @@ def lookup_tables(self):
if self._tables is None:
self._tables = self._do_lookup_tables()
return self._tables

def _do_resolve_adapter_name(self) -> str:
logger.info("Resolving adapter name")

adapter_names: list[str] = []

def on_result(result: str):
adapter_names.append(result)
logger.debug("Resolved adapter name: %s", result)

self._run_operation("resolve_adapter_name", project_vars=None, result_callback=on_result)

if len(adapter_names) != 1:
msg = f"Expected exactly 1 adapter name, got: {adapter_names}"
raise PumpkinError(msg)

return adapter_names[0]

def resolve_adapter_name(self) -> str:
# Do not forget to add adapter-specific macro to resolve_adapter_name.sql
if self._adapter_name is None:
self._adapter_name = self._do_resolve_adapter_name()

return self._adapter_name
16 changes: 0 additions & 16 deletions dbt_pumpkin/macros/resolve_adapter_name.sql

This file was deleted.

133 changes: 129 additions & 4 deletions dbt_pumpkin/plan.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from __future__ import annotations

import logging
from abc import abstractmethod
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING

from dbt_pumpkin.data import ResourceType
from dbt_pumpkin.exception import PumpkinError, ResourceNotFoundError
from dbt_pumpkin.storage import Storage
from dbt_pumpkin.exception import PropertyNotAllowedError, PropertyRequiredError, PumpkinError, ResourceNotFoundError

if TYPE_CHECKING:
from pathlib import Path

from dbt_pumpkin.storage import Storage

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -78,6 +84,125 @@ def execute(self, files: dict[Path, dict]):
to_resources.append({"name": self.resource_name, "columns": []})


@dataclass(frozen=True)
class ResourceColumnAction(Action, ABC):
source_name: str | None
path: Path

def __post_init__(self):
if self.resource_type == ResourceType.SOURCE and not self.source_name:
raise PropertyRequiredError("source_name", self.resource_name) # noqa: EM101
if self.resource_type != ResourceType.SOURCE and self.source_name is not None:
raise PropertyNotAllowedError("source_name", self.resource_name) # noqa: EM101
if not self.path:
raise PropertyRequiredError("path", self.resource_name) # noqa: EM101

def affected_files(self) -> set[Path]:
return {self.path}

def _get_or_create_columns(self, files: dict[Path, dict]) -> list[dict[str, any]]:
if self.path not in files:
raise ResourceNotFoundError(self.resource_name, self.path)

yaml_content = files[self.path]
yaml_resources: list = yaml_content[self.resource_type.plural_name]

if self.resource_type == ResourceType.SOURCE:
# We need to go 1 level deeper for sources
yaml_source = next((r for r in yaml_resources if r["name"] == self.source_name), None)
if not yaml_source:
msg = f"Source {self.source_name} not found in {self.path}"
raise PumpkinError(msg)

yaml_resources = yaml_source.setdefault("tables", [])

yaml_resource = next((r for r in yaml_resources if r["name"] == self.resource_name), None)
if not yaml_resource:
msg = f"Resource {self.resource_name} not found in {self.path}"
raise PumpkinError(msg)

return yaml_resource.setdefault("columns", [])


@dataclass(frozen=True)
class AddResourceColumn(ResourceColumnAction):
column_name: str
column_quote: bool
column_type: str

def describe(self) -> str:
return (
f"Add column {self.resource_type} {self.resource_name} {self.column_name} {self.column_type} at {self.path}"
)

def execute(self, files: dict[Path, dict]):
yaml_columns = self._get_or_create_columns(files)
yaml_columns.append(
{"name": self.column_name, **({"quote": True} if self.column_quote else {}), "data_type": self.column_type}
)


@dataclass(frozen=True)
class UpdateResourceColumn(ResourceColumnAction):
column_name: str
column_type: str

def describe(self) -> str:
return f"Update column {self.resource_type} {self.resource_name} {self.column_name} {self.column_type} at {self.path}"

def execute(self, files: dict[Path, dict]):
yaml_columns = self._get_or_create_columns(files)
yaml_column = next((c for c in yaml_columns if c["name"] == self.column_name), None)
if not yaml_column:
msg = f"Column {self.column_name} not found in {self.resource_type} {self.resource_type}"
raise PumpkinError(msg)

yaml_column["data_type"] = self.column_type


@dataclass(frozen=True)
class DeleteResourceColumn(ResourceColumnAction):
column_name: str

def describe(self) -> str:
return f"Delete column {self.resource_type} {self.resource_name} {self.column_name} at {self.path}"

def execute(self, files: dict[Path, dict]):
yaml_columns = self._get_or_create_columns(files)
yaml_column = next((c for c in yaml_columns if c["name"] == self.column_name), None)
if not yaml_column:
msg = f"Column {self.column_name} not found in {self.resource_type} {self.resource_type}"
raise PumpkinError(msg)

yaml_columns.remove(yaml_column)


@dataclass(frozen=True)
class ReorderResourceColumns(ResourceColumnAction):
columns_order: list[str]

def __post_init__(self):
if len(self.columns_order) != len(set(self.columns_order)):
msg = f"Column names must be unique: {self.columns_order}"
raise PumpkinError(msg)

def describe(self) -> str:
return f"Reorder columns {self.resource_type} {self.resource_name} at {self.path}"

def execute(self, files: dict[Path, dict]):
yaml_columns = self._get_or_create_columns(files)
column_by_name = {yc["name"]: yc for yc in yaml_columns}

if column_by_name.keys() != set(self.columns_order):
msg = f"Column names in YAML and provided don't match: {column_by_name.keys()} vs {self.columns_order}"
raise PumpkinError(msg)

reordered_columns = [column_by_name[name] for name in self.columns_order]

yaml_columns.clear()
yaml_columns.extend(reordered_columns)


class Plan:
def __init__(self, actions: list[Action]):
self.actions = actions
Expand Down
Loading

0 comments on commit cf545fe

Please sign in to comment.