Skip to content

Commit

Permalink
Merge pull request #6 from neicnordic/feature/validate-json-schema
Browse files Browse the repository at this point in the history
Feature/validate json schema
  • Loading branch information
blankdots authored Dec 10, 2020
2 parents 0fb7d36 + 3548ccd commit 3356f81
Show file tree
Hide file tree
Showing 19 changed files with 1,310 additions and 68 deletions.
11 changes: 5 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM python:3.7-alpine3.10 as BUILD
FROM python:3.7-alpine3.11 as BUILD

RUN apk add --no-cache git postgresql-libs postgresql-dev gcc musl-dev libffi-dev make gnupg && \
RUN apk add --no-cache git gcc musl-dev libffi-dev make gnupg && \
rm -rf /var/cache/apk/*

COPY requirements.txt /root/sdaorch/requirements.txt
Expand All @@ -11,7 +11,7 @@ RUN pip install --upgrade pip && \
pip install -r /root/sdaorch/requirements.txt && \
pip install /root/sdaorch

FROM python:3.7-alpine3.10
FROM python:3.7-alpine3.11

LABEL maintainer "NeIC System Developers"
LABEL org.label-schema.schema-version="1.0"
Expand All @@ -28,9 +28,8 @@ COPY --from=BUILD /usr/local/bin/sdaverified /usr/local/bin/

ADD supervisor.conf /etc/

RUN addgroup -g 1000 sda && \
adduser -D -u 1000 -G sda sda
RUN echo "nobody:x:65534:65534:nobody:/:/sbin/nologin" > passwd

USER 1000
USER 65534

ENTRYPOINT ["supervisord", "--configuration", "/etc/supervisor.conf"]
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
amqpstorm
amqpstorm
jsonschema
2 changes: 1 addition & 1 deletion sda_orchestrator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""SDA Orchestrator service for coordinating messages and mapping file id to dataset id."""

__title__ = "sda_orchestrator"
__version__ = "0.3.0"
__version__ = "0.4.0"
__author__ = "NeIC System Developers"
61 changes: 44 additions & 17 deletions sda_orchestrator/complete_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from .utils.logger import LOG
import os
from .utils.id_ops import generate_dataset_id
from jsonschema.exceptions import ValidationError
from .schemas.validate import ValidateJSON, load_schema


class CompleteConsumer(Consumer):
Expand All @@ -15,35 +17,60 @@ def handle_message(self, message: Message) -> None:
try:
complete_msg = json.loads(message.body)

LOG.info(f"Completed message received: {complete_msg} .")
properties = {
"content_type": "application/json",
"headers": {},
"correlation_id": message.correlation_id,
"delivery_mode": 2,
}
LOG.debug(f"MQ Message body: {message.body} .")
LOG.debug(f"Complete Consumer message received: {complete_msg} .")
LOG.info(
f"Received work (corr-id: {message.correlation_id} filepath: {complete_msg['filepath']}, \
user: {complete_msg['user']}, accessionid: {complete_msg['accession_id']}, \
decryptedChecksums: {complete_msg['decrypted_checksums']})",
)

channel = self.connection.channel() # type: ignore
datasetID = generate_dataset_id(complete_msg["user"], complete_msg["filepath"])
ValidateJSON(load_schema("ingestion-completion")).validate(complete_msg)

# Send message to mappings queue for dataset to file mapping
accessionID = complete_msg["accession_id"]
content = {
"type": "mapping",
"dataset_id": datasetID,
"accession_ids": [accessionID],
}
mapping = Message.create(channel, json.dumps(content), properties)
datasetID = generate_dataset_id(complete_msg["user"], complete_msg["filepath"])
self._publish_mappings(message, accessionID, datasetID)

except ValidationError:
LOG.error("Could not validate the ingestion complete message. Not properly formatted.")
raise

except Exception as error:
LOG.error(f"Error occurred in complete consumer: {error}.")
raise

def _publish_mappings(self, message: Message, accessionID: str, datasetID: str) -> None:
"""Publish message with dataset to accession ID mapping."""
properties = {
"content_type": "application/json",
"headers": {},
"correlation_id": message.correlation_id,
"delivery_mode": 2,
}
try:

channel = self.connection.channel() # type: ignore
mappings_trigger = {"type": "mapping", "dataset_id": datasetID, "accession_ids": [accessionID]}

mappings_msg = json.dumps(mappings_trigger)
ValidateJSON(load_schema("dataset-mapping")).validate(json.loads(mappings_msg))

mapping = Message.create(channel, mappings_msg, properties)
mapping.publish(
os.environ.get("MAPPINGS_QUEUE", "mappings"), exchange=os.environ.get("BROKER_EXCHANGE", "sda")
)

channel.close()

LOG.info(
f"Sent the message to mappings queue to set dataset ID {datasetID} for file \
with accessionID {accessionID}."
)

except Exception as error:
LOG.error("Something went wrong: {0}".format(error))
except ValidationError:
LOG.error("Could not validate the ingestion mappings message. Not properly formatted.")
raise Exception("Could not validate the ingestion mappings message. Not properly formatted.")


def main() -> None:
Expand Down
76 changes: 60 additions & 16 deletions sda_orchestrator/inbox_consume.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""Message Broker inbox step consumer."""
import json
from typing import Dict
from amqpstorm import Message
from .utils.consumer import Consumer
from .utils.logger import LOG
import os
from pathlib import Path
from jsonschema.exceptions import ValidationError
from .schemas.validate import ValidateJSON, load_schema


class InboxConsumer(Consumer):
Expand All @@ -13,29 +16,70 @@ class InboxConsumer(Consumer):
def handle_message(self, message: Message) -> None:
"""Handle message."""
try:
inbx_msg = json.loads(message.body)
properties = {"content_type": "application/json", "headers": {}, "correlation_id": message.correlation_id}
inbox_msg = json.loads(message.body)

LOG.debug(f"MQ Message body: {message.body} .")
LOG.debug(f"Inbox Consumer message received: {inbox_msg} .")
LOG.info(
f"Received work (corr-id: {message.correlation_id} filepath: {inbox_msg['filepath']}, \
user: {inbox_msg['user']} with operation: {inbox_msg['operation']})",
)

if inbox_msg["operation"] == "upload":
ValidateJSON(load_schema("inbox-upload")).validate(inbox_msg)
elif inbox_msg["operation"] == "rename":
ValidateJSON(load_schema("inbox-rename")).validate(inbox_msg)
elif inbox_msg["operation"] == "remove":
ValidateJSON(load_schema("inbox-remove")).validate(inbox_msg)
else:
LOG.error("Un-identified inbox operation.")

# we check if this is a path with a suffix or a name
test_path = Path(inbx_msg["filepath"])
test_path = Path(inbox_msg["filepath"])
if test_path.suffix == "" or test_path.name in ["", ".", ".."]:
LOG.error(f"file: {test_path} does not appear to be a correct path.")
raise FileNotFoundError

# Create the files message.
# we keep the encrypted_checksum but it can also be missing
self._publish_ingest(message, inbox_msg)

except ValidationError:
LOG.error("Could not validate the inbox message. Not properly formatted.")
raise

except Exception as error:
LOG.error(f"Error occurred in inbox consumer: {error}.")
raise

def _publish_ingest(self, message: Message, inbox_msg: Dict) -> None:
"""Publish message with dataset to accession ID mapping."""
properties = {
"content_type": "application/json",
"headers": {},
"correlation_id": message.correlation_id,
"delivery_mode": 2,
}
try:
channel = self.connection.channel() # type: ignore
content = {
"type": "ingest",
"user": inbx_msg["user"],
"filepath": inbx_msg["filepath"],
}
if "encrypted_checksums" in inbx_msg:
content["encrypted_checksums"] = inbx_msg["encrypted_checksums"]
sent = Message.create(channel, json.dumps(content), properties)

sent.publish(os.environ.get("INGEST_QUEUE", "ingest"), exchange=os.environ.get("BROKER_EXCHANGE", "sda"))

ingest_trigger = {"type": "ingest", "user": inbox_msg["user"], "filepath": inbox_msg["filepath"]}
if "encrypted_checksums" in inbox_msg:
ingest_trigger["encrypted_checksums"] = inbox_msg["encrypted_checksums"]

ingest_msg = json.dumps(ingest_trigger)
ValidateJSON(load_schema("ingestion-trigger")).validate(json.loads(ingest_msg))

ingest = Message.create(channel, ingest_msg, properties)

ingest.publish(os.environ.get("INGEST_QUEUE", "ingest"), exchange=os.environ.get("BROKER_EXCHANGE", "sda"))
channel.close()
LOG.info(f'Sent the message to ingest queue to trigger ingestion for filepath: {inbx_msg["filepath"]}.')
except Exception as error:
LOG.error("Something went wrong: {0}".format(error))

LOG.info(f'Sent the message to ingest queue to trigger ingestion for filepath: {inbox_msg["filepath"]}.')

except ValidationError:
LOG.error("Could not validate the ingest trigger message. Not properly formatted.")
raise Exception("Could not validate the ingest trigger message. Not properly formatted.")


def main() -> None:
Expand Down
5 changes: 5 additions & 0 deletions sda_orchestrator/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""JSON Schemas and function for validating messages.
Schemas are provided by https://github.com/EGA-archive/LocalEGA/tree/master/ingestion/schemas
Under Apache 2.0 license
"""
47 changes: 47 additions & 0 deletions sda_orchestrator/schemas/dataset-mapping.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"title": "JSON schema for dataset mapping message interface. Derived from Federated EGA schemas.",
"$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/dataset-mapping.json",
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [
"type",
"dataset_id",
"accession_ids"
],
"additionalProperties": true,
"properties": {
"type": {
"$id": "#/properties/type",
"type": "string",
"title": "The message type",
"description": "The message type",
"const": "mapping"
},
"dataset_id": {
"$id": "#/properties/dataset_id",
"type": "string",
"title": "The Accession identifier for the dataset",
"description": "The Accession identifier for the dataset",
"pattern": "^\\S+$",
"examples": [
"anyidentifier"
]
},
"accession_ids": {
"$id": "#/properties/accession_ids",
"type": "array",
"title": "The file stable ids in that dataset",
"description": "The file stable ids in that dataset",
"examples": [
[
"anyidentifier"
]
],
"additionalItems": false,
"items": {
"type": "string",
"pattern": "^\\S+$"
}
}
}
}
39 changes: 39 additions & 0 deletions sda_orchestrator/schemas/inbox-remove.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"title": "JSON schema for Local EGA inbox remove message interface",
"$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/inbox-remove.json",
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [
"user",
"filepath",
"operation"
],
"additionalProperties": true,
"properties": {
"user": {
"$id": "#/properties/user",
"type": "string",
"title": "The username",
"description": "The username",
"examples": [
"[email protected]"
]
},
"filepath": {
"$id": "#/properties/filepath",
"type": "string",
"title": "The unique identifier to the file location",
"description": "The unique identifier to the file location",
"examples": [
"/ega/inbox/[email protected]/the-file.c4gh"
]
},
"operation": {
"$id": "#/properties/operation",
"type": "string",
"const": "remove",
"title": "The operation type",
"description": "The operation type"
}
}
}
49 changes: 49 additions & 0 deletions sda_orchestrator/schemas/inbox-rename.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"title": "JSON schema for Local EGA inbox rename message interface",
"$id": "https://github.com/EGA-archive/LocalEGA/tree/master/schemas/inbox-rename.json",
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"required": [
"user",
"filepath",
"oldpath",
"operation"
],
"additionalProperties": true,
"properties": {
"user": {
"$id": "#/properties/user",
"type": "string",
"title": "The username",
"description": "The username",
"examples": [
"[email protected]"
]
},
"filepath": {
"$id": "#/properties/filepath",
"type": "string",
"title": "The new filepath",
"description": "The new filepath",
"examples": [
"/ega/inbox/[email protected]/the-file.c4gh"
]
},
"oldpath": {
"$id": "#/properties/oldpath",
"type": "string",
"title": "The old filepath",
"description": "The old filepath",
"examples": [
"/ega/inbox/[email protected]/the-old-file.c4gh"
]
},
"operation": {
"$id": "#/properties/operation",
"type": "string",
"const": "rename",
"title": "The operation type",
"description": "The operation type"
}
}
}
Loading

0 comments on commit 3356f81

Please sign in to comment.