Skip to content

Commit

Permalink
Green unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mmwinther committed Jan 9, 2024
1 parent c6ef50e commit 4d86352
Show file tree
Hide file tree
Showing 20 changed files with 1,102 additions and 1,005 deletions.
12 changes: 0 additions & 12 deletions SECURITY.md

This file was deleted.

179 changes: 101 additions & 78 deletions datadoc/backend/datadoc_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,6 @@

logger = logging.getLogger(__name__)

OBLIGATORY_DATASET_METADATA = [
m.identifier
for m in display_dataset.DISPLAY_DATASET.values()
if m.obligatory and m.editable
]

OBLIGATORY_VARIABLES_METADATA = [
m.identifier
for m in display_variables.DISPLAY_VARIABLES.values()
if m.obligatory and m.editable
]

# These don't vary at runtime so we calculate them as constants here
NUM_OBLIGATORY_DATASET_FIELDS = len(
[k for k in model.Dataset().model_dump() if k in OBLIGATORY_DATASET_METADATA],
)
NUM_OBLIGATORY_VARIABLES_FIELDS = len(
[k for k in model.Variable().model_dump() if k in OBLIGATORY_VARIABLES_METADATA],
)

METADATA_DOCUMENT_FILE_SUFFIX = "__DOC.json"

PLACEHOLDER_USERNAME = "[email protected]"
Expand All @@ -53,21 +33,46 @@ class DataDocMetadata:

def __init__(
self: t.Self @ DataDocMetadata,
dataset: str | None,
dataset_path: str | os.PathLike | None = None,
metadata_document_path: str | os.PathLike | None = None,
) -> None:
"""Read in a dataset if supplied, otherwise naively instantiate the class."""
self.dataset: str = dataset
if self.dataset:
self.dataset: str = dataset_path
self.metadata_document: StorageAdapter | None = None
self.container: model.MetadataContainer | None = None

self.dataset_state: DatasetState | None = None
self.short_name: str | None = None
self.current_user: str | None = None
self.meta: model.DatadocJsonSchema = model.DatadocJsonSchema(
percentage_complete=0,
dataset=model.Dataset(),
variables=[],
)

self.variables_lookup: dict[str, model.Variable] = {}

if metadata_document_path:
# In this case the user has specified an independent metadata document for editing
# without a dataset.
self.metadata_document = StorageAdapter.for_path(metadata_document_path)
self.extract_metadata_from_existing_document()

elif self.dataset:
# The short_name is set as the dataset filename without file extension
self.short_name: str = pathlib.Path(
self.dataset,
).stem # filename without file ending
).stem
self.metadata_document: StorageAdapter = StorageAdapter.for_path(
StorageAdapter.for_path(self.dataset).parent(),
)
self.metadata_document.joinpath(
self.short_name + METADATA_DOCUMENT_FILE_SUFFIX,
)
self.dataset_state: DatasetState = self.get_dataset_state(self.dataset)

self.extract_metadata_from_files()

try:
self.current_user = os.environ["JUPYTERHUB_USER"]
except KeyError:
Expand All @@ -77,17 +82,6 @@ def __init__(
self.current_user,
)

self.meta: model.DatadocJsonSchema = model.DatadocJsonSchema(
percentage_complete=0,
dataset=model.Dataset(),
variables=[],
)

self.variables_lookup: dict[str, model.Variable] = {}

if self.dataset:
self.extract_metadata_from_files()

def get_dataset_state(
self: t.Self @ DataDocMetadata,
dataset: str,
Expand Down Expand Up @@ -134,55 +128,77 @@ def get_dataset_version(
return None

def extract_metadata_from_files(self: t.Self @ DataDocMetadata) -> None:
"""Read metadata from a dataset.
"""Read metadata from an existing metadata document.
If a metadata document already exists, read in the metadata from that instead.
If no metadata document exists, create one from scratch by extracting metadata
from the dataset file.
"""
fresh_metadata = {}
if self.metadata_document.exists():
try:
with self.metadata_document.open(mode="r", encoding="utf-8") as file:
fresh_metadata = json.load(file)
logger.info(
"Opened existing metadata file %s",
self.metadata_document.location,
)

fresh_metadata = upgrade_metadata(
fresh_metadata,
model.DatadocJsonSchema().document_version,
)

variables_list = fresh_metadata.pop("variables", None)

self.meta.variables = [model.Variable(**v) for v in variables_list]
self.meta.dataset = model.Dataset(
**fresh_metadata.pop("dataset", None),
)
except json.JSONDecodeError:
logger.warning(
"Could not open existing metadata file %s. \
Falling back to collecting data from the dataset",
self.metadata_document.location,
exc_info=True,
)
self.extract_metadata_from_dataset()
self.extract_metadata_from_existing_document()
else:
self.extract_metadata_from_dataset()

if self.meta.dataset.id is None:
self.meta.dataset.id = uuid.uuid4()

# Set default values for variables where appropriate
v: model.Variable
for v in self.meta.variables:
if v.variable_role is None:
v.variable_role = VariableRole.MEASURE
if v.direct_person_identifying is None:
v.direct_person_identifying = False
# Set default values for variables where appropriate
v: model.Variable
for v in self.meta.variables:
if v.variable_role is None:
v.variable_role = VariableRole.MEASURE
if v.direct_person_identifying is None:
v.direct_person_identifying = False

if not self.meta.dataset.id:
self.meta.dataset.id = uuid.uuid4()

self.variables_lookup = {v.short_name: v for v in self.meta.variables}

def extract_metadata_from_existing_document(self: t.Self @ DataDocMetadata) -> None:
"""There's an existing metadata document, so read in the metadata from that."""
fresh_metadata = {}
try:
with self.metadata_document.open(mode="r", encoding="utf-8") as file:
fresh_metadata = json.load(file)
logger.info(
"Opened existing metadata file %s",
self.metadata_document.location,
)

if self.is_metadata_in_container_structure(fresh_metadata):
self.container = model.MetadataContainer.model_validate_json(
json.dumps(fresh_metadata),
)
datadoc_metadata = fresh_metadata["datadoc"]
else:
datadoc_metadata = fresh_metadata

datadoc_metadata = upgrade_metadata(
datadoc_metadata,
)

self.meta = model.DatadocJsonSchema.model_validate_json(
json.dumps(datadoc_metadata),
)

except json.JSONDecodeError:
logger.warning(
"Could not open existing metadata file %s. \
Falling back to collecting data from the dataset",
self.metadata_document.location,
exc_info=True,
)

def is_metadata_in_container_structure(
self: t.Self @ DataDocMetadata,
metadata: dict,
) -> bool:
"""At a certain point a metadata 'container' was introduced.
The container provides a structure for different 'types' of metadata, such as 'datadoc', 'pseudonymization' etc.
This method returns True if the metadata is in the container structure, False otherwise.
"""
return "datadoc" in metadata and "dataset" in metadata["datadoc"]

def extract_metadata_from_dataset(self: t.Self @ DataDocMetadata) -> None:
"""Obtain what metadata we can from the dataset itself.
Expand Down Expand Up @@ -210,7 +226,13 @@ def write_metadata_document(self: t.Self @ DataDocMetadata) -> None:
self.meta.dataset.metadata_created_by = self.current_user
self.meta.dataset.metadata_last_updated_date = timestamp
self.meta.dataset.metadata_last_updated_by = self.current_user
self.metadata_document.write_text(self.meta.model_dump_json(indent=4))

if self.container:
self.container.datadoc = self.meta
else:
self.container = model.MetadataContainer(datadoc=self.meta)

self.metadata_document.write_text(self.container.model_dump_json(indent=4))
logger.info("Saved metadata document %s", self.metadata_document.location)

@property
Expand All @@ -221,22 +243,23 @@ def percent_complete(self: t.Self @ DataDocMetadata) -> int:
assigned. Used for a live progress bar in the UI, as well as being
saved in the datadoc as a simple quality indicator.
"""
num_all_fields = NUM_OBLIGATORY_DATASET_FIELDS
num_all_fields = len(display_dataset.OBLIGATORY_DATASET_METADATA)
num_set_fields = len(
[
k
for k, v in self.meta.dataset.model_dump().items()
if k in OBLIGATORY_DATASET_METADATA and v is not None
if k in display_dataset.OBLIGATORY_DATASET_METADATA and v is not None
],
)

for variable in self.meta.variables:
num_all_fields += NUM_OBLIGATORY_VARIABLES_FIELDS
num_all_fields += len(display_variables.OBLIGATORY_VARIABLES_METADATA)
num_set_fields += len(
[
k
for k, v in variable.model_dump().items()
if k in OBLIGATORY_VARIABLES_METADATA and v is not None
if k in display_variables.OBLIGATORY_VARIABLES_METADATA
and v is not None
],
)

Expand Down
67 changes: 51 additions & 16 deletions datadoc/backend/model_backwards_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
from __future__ import annotations

import typing as t
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime, timezone

from datadoc_model.model import LanguageStringType

if t.TYPE_CHECKING:
from collections.abc import Callable
Expand All @@ -37,7 +41,7 @@ def __str__(self: t.Self @ UnknownModelVersionError) -> str:
return f"Document Version ({self.supplied_version}) of discovered file is not supported"


SUPPORTED_VERSIONS: dict[str, BackwardsCompatibleVersion] = {}
SUPPORTED_VERSIONS: OrderedDict[str, BackwardsCompatibleVersion] = OrderedDict()


@dataclass()
Expand All @@ -57,6 +61,30 @@ def handle_current_version(supplied_metadata: dict) -> dict:
return supplied_metadata


def handle_version_1_0_0(supplied_metadata: dict) -> dict:
"""Handle breaking changes for v1.0.0."""
datetime_fields = [
("metadata_created_date"),
("metadata_last_updated_date"),
]
for field in datetime_fields:
if supplied_metadata["dataset"][field]:
supplied_metadata["dataset"][field] = datetime.isoformat(
datetime.fromisoformat(supplied_metadata["dataset"][field]).astimezone(
tz=timezone.utc,
),
timespec="seconds",
)

if isinstance(supplied_metadata["dataset"]["data_source"], str):
supplied_metadata["dataset"]["data_source"] = LanguageStringType(
en=supplied_metadata["dataset"]["data_source"],
)
supplied_metadata["document_version"] = "2.0.0"

return supplied_metadata


def handle_version_0_1_1(supplied_metadata: dict) -> dict:
"""Handle breaking changes for v0.1.1.
Expand All @@ -80,26 +108,33 @@ def handle_version_0_1_1(supplied_metadata: dict) -> dict:
return supplied_metadata


# Register all the supported versions and their handlers
# Register all the supported versions and their handlers.
# MUST be ordered from oldest to newest.
BackwardsCompatibleVersion(version="0.1.1", handler=handle_version_0_1_1)
BackwardsCompatibleVersion(
version="1", # Some documents exist with incorrect version specification
handler=handle_version_0_1_1,
version="1.0.0",
handler=handle_version_1_0_0,
)
BackwardsCompatibleVersion(
version="2.0.0",
handler=handle_current_version,
)


def upgrade_metadata(fresh_metadata: dict, current_model_version: str) -> dict:
def upgrade_metadata(fresh_metadata: dict) -> dict:
"""Run the handler for this version to upgrade the document to the latest version."""
# Special case for current version, we expose the current_model_version parameter for test purposes
SUPPORTED_VERSIONS[current_model_version] = BackwardsCompatibleVersion(
current_model_version,
handle_current_version,
)
supplied_version = fresh_metadata[VERSION_FIELD_NAME]
try:
# Retrieve the upgrade function for this version
upgrade = SUPPORTED_VERSIONS[supplied_version].handler
except KeyError as e:
raise UnknownModelVersionError(supplied_version) from e
else:
return upgrade(fresh_metadata)
start_running_handlers = False

# Run all the handlers in order from the supplied version onwards
for k, v in SUPPORTED_VERSIONS.items():
if k == supplied_version:
start_running_handlers = True
if start_running_handlers:
fresh_metadata = v.handler(fresh_metadata)

if not start_running_handlers:
raise UnknownModelVersionError(supplied_version)

return fresh_metadata
Loading

0 comments on commit 4d86352

Please sign in to comment.