Skip to content

Commit

Permalink
Improve checking of schema version for pre-1.0.0 manifests (#4497)
Browse files Browse the repository at this point in the history
* [#4470] Improve checking of schema version for pre-1.0.0 manifests

* Check exception code instead of message in test
  • Loading branch information
gshank authored Dec 16, 2021
1 parent 9f6ed3c commit 067b861
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Fix redefined status param of SQLQueryStatus to typecheck the string which passes on `._message` value of `AdapterResponse` or the `str` value sent by adapter plugin. ([#4463](https://github.com/dbt-labs/dbt-core/pull/4463#issuecomment-990174166))
- Fix `DepsStartPackageInstall` event to use package name instead of version number. ([#4482](https://github.com/dbt-labs/dbt-core/pull/4482))
- Reimplement log message to use adapter name instead of the object method. ([#4501](https://github.com/dbt-labs/dbt-core/pull/4501))
- Issue better error message for incompatible schemas ([#4470](https://github.com/dbt-labs/dbt-core/pull/4442), [#4497](https://github.com/dbt-labs/dbt-core/pull/4497))

### Docs
- Fix missing data on exposures in docs ([#4467](https://github.com/dbt-labs/dbt-core/issues/4467))
Expand Down
6 changes: 4 additions & 2 deletions core/dbt/contracts/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ def __init__(self, path: Path):
manifest_path = self.path / 'manifest.json'
if manifest_path.exists() and manifest_path.is_file():
try:
self.manifest = WritableManifest.read(str(manifest_path))
# we want to bail with an error if schema versions don't match
self.manifest = WritableManifest.read_and_check_versions(str(manifest_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(manifest_path))
raise

results_path = self.path / 'run_results.json'
if results_path.exists() and results_path.is_file():
try:
self.results = RunResultsArtifact.read(str(results_path))
# we want to bail with an error if schema versions don't match
self.results = RunResultsArtifact.read_and_check_versions(str(results_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(results_path))
raise
41 changes: 41 additions & 0 deletions core/dbt/contracts/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dbt.exceptions import (
InternalException,
RuntimeException,
IncompatibleSchemaException
)
from dbt.version import __version__
from dbt.events.functions import get_invocation_id
Expand Down Expand Up @@ -158,6 +159,8 @@ def get_metadata_env() -> Dict[str, str]:
}


# This is used in the ManifestMetadata, RunResultsMetadata, RunOperationResultMetadata,
# FreshnessMetadata, and CatalogMetadata classes
@dataclasses.dataclass
class BaseArtifactMetadata(dbtClassMixin):
dbt_schema_version: str
Expand All @@ -177,6 +180,17 @@ def __post_serialize__(self, dct):
return dct


# This is used as a class decorator to set the schema_version in the
# 'dbt_schema_version' class attribute. (It's copied into the metadata objects.)
# Name attributes of SchemaVersion in classes with the 'schema_version' decorator:
# manifest
# run-results
# run-operation-result
# sources
# catalog
# remote-compile-result
# remote-execution-result
# remote-run-result
def schema_version(name: str, version: int):
def inner(cls: Type[VersionedSchema]):
cls.dbt_schema_version = SchemaVersion(
Expand All @@ -187,6 +201,7 @@ def inner(cls: Type[VersionedSchema]):
return inner


# This is used in the ArtifactMixin and RemoteResult classes
@dataclasses.dataclass
class VersionedSchema(dbtClassMixin):
dbt_schema_version: ClassVar[SchemaVersion]
Expand All @@ -198,13 +213,39 @@ def json_schema(cls, embeddable: bool = False) -> Dict[str, Any]:
result['$id'] = str(cls.dbt_schema_version)
return result

@classmethod
def read_and_check_versions(cls, path: str):
try:
data = read_json(path)
except (EnvironmentError, ValueError) as exc:
raise RuntimeException(
f'Could not read {cls.__name__} at "{path}" as JSON: {exc}'
) from exc

# Check metadata version. There is a class variable 'dbt_schema_version', but
# that doesn't show up in artifacts, where it only exists in the 'metadata'
# dictionary.
if hasattr(cls, 'dbt_schema_version'):
if 'metadata' in data and 'dbt_schema_version' in data['metadata']:
previous_schema_version = data['metadata']['dbt_schema_version']
# cls.dbt_schema_version is a SchemaVersion object
if str(cls.dbt_schema_version) != previous_schema_version:
raise IncompatibleSchemaException(
expected=str(cls.dbt_schema_version),
found=previous_schema_version
)

return cls.from_dict(data) # type: ignore


T = TypeVar('T', bound='ArtifactMixin')


# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to
# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue:
# https://github.com/python/mypy/issues/7520
# This is used in the WritableManifest, RunResultsArtifact, RunOperationResultsArtifact,
# and CatalogArtifact
@dataclasses.dataclass(init=False)
class ArtifactMixin(VersionedSchema, Writable, Readable):
metadata: BaseArtifactMetadata
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v3.json",
"dbt_version": "0.21.1"
}
}
11 changes: 9 additions & 2 deletions test/integration/062_defer_state_test/test_modified_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pytest

from dbt.exceptions import CompilationException
from dbt.exceptions import CompilationException, IncompatibleSchemaException


class TestModifiedState(DBTIntegrationTest):
Expand Down Expand Up @@ -36,7 +36,7 @@ def _symlink_test_folders(self):
for entry in os.listdir(self.test_original_source_path):
src = os.path.join(self.test_original_source_path, entry)
tst = os.path.join(self.test_root_dir, entry)
if entry in {'models', 'seeds', 'macros'}:
if entry in {'models', 'seeds', 'macros', 'previous_state'}:
shutil.copytree(src, tst)
elif os.path.isdir(entry) or entry.endswith('.sql'):
os.symlink(src, tst)
Expand Down Expand Up @@ -202,3 +202,10 @@ def test_postgres_changed_exposure(self):
results, stdout = self.run_dbt_and_capture(['run', '--models', '+state:modified', '--state', './state'])
assert len(results) == 1
assert results[0].node.name == 'view_model'

@use_profile('postgres')
def test_postgres_previous_version_manifest(self):
# This tests that a different schema version in the file throws an error
with self.assertRaises(IncompatibleSchemaException) as exc:
results = self.run_dbt(['ls', '-s', 'state:modified', '--state', './previous_state'])
self.assertEqual(exc.CODE, 10014)

0 comments on commit 067b861

Please sign in to comment.