From bea4a65a7d1bd50b7a73088b1cc9a627c4c72f8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sa=C5=A1o=20Stanovnik?= Date: Wed, 5 Aug 2020 14:01:21 +0200 Subject: [PATCH] A complete rewrite of the executors, using the library. This just simulated CLI before, which was brittle and broke with each minor change. There's a new library API now that ensures things work as expected, and are cleaner. Also concurrency changes to be multiprocess with a pool. --- README.md | 9 +- docker-compose.yml | 1 + openapi-spec.yml | 34 ++-- requirements.txt | 4 +- .../api/controllers/background_invocation.py | 184 ++++++++++++++++++ src/opera/api/controllers/default.py | 88 ++++----- .../api/controllers/invocation_service.py | 147 -------------- src/opera/api/controllers/utils.py | 71 ------- 8 files changed, 257 insertions(+), 281 deletions(-) create mode 100644 src/opera/api/controllers/background_invocation.py delete mode 100644 src/opera/api/controllers/invocation_service.py delete mode 100644 src/opera/api/controllers/utils.py diff --git a/README.md b/README.md index 211f03c..443e5fb 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,14 @@ With Docker: ```shell script docker-compose up --build -curl localhost:8080 +docker cp test.csar xopera-api_api_1:/app/ +docker exec xopera-api_api_1 unzip test.csar +# prepare request inputs: service_template, inputs (in JSON object form, not a string) +curl -XPOST localhost:8080/validate -H "Content-Type: application/json" -d @inputs-request.json +curl -XPOST localhost:8080/deploy -H "Content-Type: application/json" -d @inputs-request.json +curl localhost:8080/status +curl localhost:8080/outputs +curl -XPOST localhost:8080/undeploy ``` With a local development installation: diff --git a/docker-compose.yml b/docker-compose.yml index 48338c7..fff4775 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,7 @@ services: build: . environment: DEBUG: "true" + LOG_LEVEL: debug labels: - "traefik.enable=true" - "traefik.http.services.api.loadbalancer.server.port=8080" diff --git a/openapi-spec.yml b/openapi-spec.yml index a68b74f..c3dc253 100644 --- a/openapi-spec.yml +++ b/openapi-spec.yml @@ -1,6 +1,6 @@ openapi: "3.0.0" info: - version: 1.0.0 + version: 0.1.1 title: xOpera API license: name: Apache-2.0 @@ -10,7 +10,7 @@ paths: summary: Deploy a CSAR operationId: deploy requestBody: - description: Deployment inputs and service template name + description: Deployment inputs and service template name. required: false content: application/json: @@ -68,7 +68,7 @@ paths: schema: $ref: "#/components/schemas/Invocation" "404": - description: No invocation with this id. + description: No invocation with this id. /outputs: get: summary: Fetch deployment outputs @@ -87,7 +87,7 @@ paths: summary: Validate a CSAR operationId: validate requestBody: - description: Validation inputs and service template name + description: Validation inputs and service template name. required: false content: application/json: @@ -118,39 +118,35 @@ components: description: Free-form mapping of outputs. type: object Invocation: - description: An invocation of the deployment + description: An invocation of the deployment. type: object required: - id - state - timestamp - - console_output properties: id: type: string state: $ref: "#/components/schemas/InvocationState" operation: - type: string - enum: - - deploy - - undeploy + type: OperationType timestamp: description: An ISO8601 timestamp of the invocation. type: string inputs: - description: Inputs provided for invocation + description: Inputs provided for invocation. type: object instance_state: - description: State of the instances defined in service template + description: State of the instances defined in service template. type: object additionalProperties: - type: string + type: string exception: - description: An internal xOpera error that occurred starting operation + description: An internal xOpera error that occurred starting operation. type: string console_output: - description: xOpera console output for operation + description: xOpera console output for operation. type: string InvocationHistory: description: Invocation history ordered by timestamp ascending. @@ -170,6 +166,12 @@ components: InvocationState: type: string enum: + - pending + - in_progress - success - failed - - in_progress + OperationType: + type: string + enum: + - deploy + - undeploy diff --git a/requirements.txt b/requirements.txt index 82a9141..6e0a56e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ -# first commit that supports PEP420 -opera==0.5.8 +# first commit with the new decoupled API +git+git://github.com/xlab-si/xopera-opera.git@d8aa1d09e6b352854ae24f9f54f57c01f65a7add#egg=opera connexion >= 2.6.0; python_version>="3.6" connexion >= 2.3.0; python_version=="3.5" diff --git a/src/opera/api/controllers/background_invocation.py b/src/opera/api/controllers/background_invocation.py new file mode 100644 index 0000000..5148229 --- /dev/null +++ b/src/opera/api/controllers/background_invocation.py @@ -0,0 +1,184 @@ +import datetime +import json +import multiprocessing +import os +import sys +import typing +import uuid +from io import StringIO +from pathlib import Path +from typing import List, Callable, Optional + +from opera.commands.deploy import deploy as opera_deploy +from opera.commands.undeploy import undeploy as opera_undeploy +from opera.storage import Storage + +from opera.api.log import get_logger +from opera.api.openapi.models import Invocation, InvocationState + +logger = get_logger(__name__) + + +def get_instance_state(): + json_dict = {} + for file_path in Path(os.path.join('.opera', 'instances')).glob("*"): + parsed = json.load(open(file_path, 'r')) + component_name = parsed['tosca_name']['data'] + json_dict[component_name] = parsed['state']['data'] + return json_dict + + +class StdoutCapture(object): + def __enter__(self): + self._stdout_backup = sys.stdout + self._stringio = StringIO() + sys.stdout = self._stringio + return self + + def __exit__(self, *args): + self.value = self._stringio.getvalue() + del self._stringio # free up some memory + sys.stdout = self._stdout_backup + + def get_value(self): + return self._stringio.getvalue() + + +class WrapperException(BaseException): + def __init__(self, invocation_uuid, wrapped_exception): + self.invocation_uuid = invocation_uuid + self.wrapped_exception = wrapped_exception + + +def wrapper_start(function, function_args, invocation_uuid): + logger.debug("Starting %s", invocation_uuid) + + local_inv = InvocationService.load_invocation(invocation_uuid) + local_inv.state = InvocationState.IN_PROGRESS + InvocationService.write_invocation(local_inv) + + with StdoutCapture() as capture: + try: + function(*function_args) + # we want the console output no matter what + except BaseException as e: + wrapped_exc = WrapperException(invocation_uuid, e) + raise wrapped_exc + finally: + local_inv = InvocationService.load_invocation(invocation_uuid) + local_inv.console_output = capture.get_value() + InvocationService.write_invocation(local_inv) + + return invocation_uuid + + +def wrapper_error(error: WrapperException): + if not isinstance(error, WrapperException): + logger.error("Unexpected out-of-band error.") + raise error + + logger.error("Error in %s", error.invocation_uuid, exc_info=error.wrapped_exception) + + local_inv = InvocationService.load_invocation(error.invocation_uuid) + local_inv.state = InvocationState.FAILED + local_inv.exception = str(error) + InvocationService.write_invocation(local_inv) + + +# gets param as the result of wrapper_start +def wrapper_done(invocation_uuid): + logger.debug("Done with %s", invocation_uuid) + + local_inv = InvocationService.load_invocation(invocation_uuid) + local_inv.state = InvocationState.SUCCESS + local_inv.instance_state = get_instance_state() + InvocationService.write_invocation(local_inv) + + +# necessary because we can't pickle the storage object and therefore can't submit upstream deploy to the pool +def opera_deploy_storage_proxy(service_template: str, inputs: typing.Optional[dict], num_workers: int): + opera_storage = Storage.create() + return opera_deploy(service_template, inputs, opera_storage, num_workers) + + +def opera_undeploy_storage_proxy(num_workers: int): + opera_storage = Storage.create() + opera_undeploy(opera_storage, num_workers) + + +class InvocationService: + def __init__(self): + # FIXME: should really be closed or used as a context manager + self.pool = multiprocessing.Pool(1) # one thing at a time + + def invoke(self, function: Callable, function_args: list, + operation_type: str, inputs: Optional[dict]) -> Invocation: + invocation_uuid = str(uuid.uuid4()) + now = datetime.datetime.now(tz=datetime.timezone.utc) + logger.info("Invoking %s with ID %s at %s", operation_type, invocation_uuid, now.isoformat()) + + inv = Invocation() + inv.id = invocation_uuid + inv.state = InvocationState.PENDING + inv.operation = operation_type + inv.timestamp = now.isoformat() + inv.inputs = inputs + inv.instance_state = None + inv.exception = None + inv.console_output = None + self.write_invocation(inv) + + wrapper_kwargs = dict( + function=function, + function_args=function_args, + invocation_uuid=invocation_uuid + ) + + # the error callback is runtime correct, as we only throw one type of exception + # noinspection PyTypeChecker + self.pool.apply_async(wrapper_start, kwds=wrapper_kwargs, callback=wrapper_done, error_callback=wrapper_error) + return inv + + @classmethod + def invocation_history(cls) -> List[Invocation]: + logger.info("Loading invocation history.") + + invocations = [] + for file_path in Path(".opera-api").glob('*.json'): + logger.debug(file_path) + invocation = Invocation.from_dict(json.load(open(file_path, 'r'))) + invocations.append(invocation) + + if invocations: + invocations.sort( + key=lambda x: datetime.datetime.strptime( + x.timestamp, + '%Y-%m-%dT%H:%M:%S.%f+00:00' + ), + reverse=True + ) + + return invocations + + @classmethod + def latest_invocation(cls) -> Optional[Invocation]: + all_invocations = cls.invocation_history() + try: + return next(all_invocations) + except StopIteration: + return None + + @classmethod + def load_invocation(cls, eye_dee: str) -> Optional[Invocation]: + all_invocations = cls.invocation_history() + try: + return next(inv for inv in all_invocations if inv.id == eye_dee) + except StopIteration: + return None + + @classmethod + def write_invocation(cls, inv: Invocation): + storage = Storage.create(".opera-api") + filename = "invocation-{}.json".format(inv.id) + dump = json.dumps(inv.to_dict()) + storage.write(dump, filename) diff --git a/src/opera/api/controllers/default.py b/src/opera/api/controllers/default.py index 2daa2c8..10f2e37 100644 --- a/src/opera/api/controllers/default.py +++ b/src/opera/api/controllers/default.py @@ -1,85 +1,85 @@ -import json +from opera.commands.outputs import outputs as opera_outputs +from opera.commands.validate import validate as opera_validate +from opera.storage import Storage -import connexion -from opera.commands import outputs as output_command -from opera.commands import validate as validate_command - -from opera.api.controllers import utils -from opera.api.controllers.invocation_service import InvocationService +from opera.api.controllers.background_invocation import InvocationService, opera_deploy_storage_proxy, \ + opera_undeploy_storage_proxy from opera.api.log import get_logger +from opera.api.openapi.models import ValidationResult, OperationType from opera.api.openapi.models.deployment_input import DeploymentInput logger = get_logger(__name__) + +# must be created (pool) _after_ any functions are referenced, otherwise AttributeError: can't get attribute invocation_service = InvocationService() -def deploy(deployment_input=None): +def deploy(body: DeploymentInput = None): logger.debug("Entry: deploy") + logger.debug(body) - if connexion.request.is_json: - jayson = connexion.request.get_json() - logger.debug("Request has json: ", str(jayson)) - deployment_input = DeploymentInput.from_dict(jayson) - - args, invocation = invocation_service.prepare_deploy(deployment_input) - invocation_service.background_function(invocation_service.run_deployment, args=args) + deployment_input = DeploymentInput.from_dict(body) + result = invocation_service.invoke( + opera_deploy_storage_proxy, [deployment_input.service_template, deployment_input.inputs, 1], + OperationType.DEPLOY, deployment_input.inputs + ) - return invocation, 200 + return result, 200 def undeploy(): logger.debug("Entry: undeploy") - args, invocation = invocation_service.prepare_undeploy() - invocation_service.background_function(invocation_service.run_undeployment, args=args) + result = invocation_service.invoke( + opera_undeploy_storage_proxy, [1], + OperationType.UNDEPLOY, None + ) - return invocation, 200 + return result, 200 def outputs(): logger.debug("Entry: outputs") - args = invocation_service.prepare_outputs() try: - with utils.CaptureString() as output: - output_command.outputs(args) - return json.loads(output.value), 200 + opera_storage = Storage.create() + result = opera_outputs(opera_storage) except Exception as e: logger.error("Error getting outputs.", e) + return {"message": str(e)}, 500 + + if not result: return {"message": "No outputs exist for this deployment."}, 404 + return result, 200 def status(): - logger.debug("Entry: outputs") - - return invocation_service.load_invocation_history(), 200 + logger.debug("Entry: status") + history = invocation_service.invocation_history() + return history, 200 def invocation_status(invocation_id): logger.debug("Entry: invocation_status") - - service = InvocationService() - response = service.load_invocation_status(invocation_id) - if response: - return response, 200 - else: + history = invocation_service.invocation_history() + try: + return next(inv for inv in history if inv.id == invocation_id), 200 + except StopIteration: return {"message": "No invocation with id {}".format(invocation_id)}, 404 -def validate(deployment_input=None): +def validate(body: DeploymentInput = None): logger.debug("Entry: validate") + logger.debug(body) - if connexion.request.is_json: - jayson = connexion.request.get_json() - logger.debug("Request has json: ", str(jayson)) - deployment_input = DeploymentInput.from_dict(jayson) - - args, invocation = invocation_service.prepare_deploy(deployment_input) + deployment_input = DeploymentInput.from_dict(body) + result = ValidationResult() try: - with utils.CaptureString() as output: - validate_command.validate(args) - return utils.get_validation_result(output), 200 + opera_validate(deployment_input.service_template, deployment_input.inputs) + result.success = True except Exception as e: - logger.error("Validation error.", e) - return str(e), 500 + result.success = False + result.message = str(e) + + return result, 200 diff --git a/src/opera/api/controllers/invocation_service.py b/src/opera/api/controllers/invocation_service.py deleted file mode 100644 index af70c68..0000000 --- a/src/opera/api/controllers/invocation_service.py +++ /dev/null @@ -1,147 +0,0 @@ -import json -import os -import threading -from datetime import datetime -from pathlib import Path - -from opera.commands import deploy, undeploy - -from opera.api.controllers import utils -from opera.api.log import get_logger -from opera.api.openapi.models import Invocation, InvocationState - -# folder name to store invocation results -INVOCATION_STORAGE_FOLDER_NAME = '.opera-api' - -logger = get_logger(__name__) - - -class InvocationService(object): - outputs = {} - - def background_function(self, func, args): - logger.debug("Invoking background function.") - invocation_thread = threading.Thread(target=func, args=[args]) - invocation_thread.start() - - def run_invocation(self, func, args, command_name): - logger.info("Running invocation: %s", command_name) - with utils.CaptureString() as output: - try: - self.outputs[args.uid] = output - result = func(args) - # if no exception save output as output - # even if operation was not successful - self.save_invocation( - args, - InvocationState.SUCCESS if result == 0 - else InvocationState.FAILED, - command_name, - console_output=output.get_value(), - instance_state=utils.get_instance_state() - ) - except Exception as e: - self.save_invocation( - args, InvocationState.FAILED, - command_name, - console_output=output.get_value(), - error=str(e), instance_state=utils.get_instance_state() - ) - finally: - self.outputs.pop(args.uid) - - def run_deployment(self, args): - self.run_invocation(deploy.deploy, args, 'deploy') - - def run_undeployment(self, args): - self.run_invocation(undeploy.undeploy, args, 'undeploy') - - def prepare_deploy(self, deployment_input): - # opera will save files to the current working directory - utils.change_current_dir() - command_args = utils.CommandArgs(deployment_input) - invocation = self.save_invocation( - command_args, - InvocationState.IN_PROGRESS, - 'deploy', - console_output='' - ) - return command_args, invocation - - def prepare_undeploy(self): - # opera gets saved files from current working directory - utils.change_current_dir() - args = utils.CommandArgs() - invocation = self.save_invocation( - args, - InvocationState.IN_PROGRESS, - 'undeploy', - console_output='' - ) - return args, invocation - - def prepare_outputs(self): - # opera gets saved files from current working directory - utils.change_current_dir() - args = utils.CommandArgs(output_format='json') - return args - - def save_invocation( - self, command_args, state, - operation, console_output, - instance_state=None, error=None): - logger.info("Saving invocation: %s", operation) - invocation = Invocation( - command_args.uid, state, operation, - command_args.timestamp, command_args.inputs, - instance_state=instance_state, - exception=error, - console_output=console_output - ) - utils.change_current_dir() - Path(INVOCATION_STORAGE_FOLDER_NAME).mkdir( - parents=True, - exist_ok=True) - filename = os.path.join( - INVOCATION_STORAGE_FOLDER_NAME, - '{0}_{1}.json'.format(operation, command_args.timestamp) - ) - with open(filename, 'w') as outfile: - json.dump(invocation.to_dict(), outfile) - return invocation - - def load_invocation_history(self): - logger.info("Loading invocation history.") - - invocations = [] - utils.change_current_dir() - for file_path in Path(INVOCATION_STORAGE_FOLDER_NAME).glob('*.json'): - logger.debug(file_path) - invocation = Invocation.from_dict(json.load(open(file_path, 'r'))) - invocations.append(invocation) - - if invocations: - invocations.sort( - key=lambda x: datetime.strptime( - x.timestamp, - '%Y-%m-%dT%H:%M:%S.%f+00:00' - ), - reverse=True - ) - - for invocation in invocations: - logger.debug(invocation) - if invocation.state == InvocationState.IN_PROGRESS: - invocation.instance_state = utils.get_instance_state() - invocation.output = '' \ - if invocation.id not in self.outputs \ - else self.outputs[invocation.id].get_value() - - return invocations - - def load_invocation_status(self, uid): - history = self.load_invocation_history() - for invocation in history: - if invocation.id == uid: - return invocation - return None diff --git a/src/opera/api/controllers/utils.py b/src/opera/api/controllers/utils.py deleted file mode 100644 index 66c37af..0000000 --- a/src/opera/api/controllers/utils.py +++ /dev/null @@ -1,71 +0,0 @@ -import json -import os -import sys -import uuid -from datetime import datetime, timezone -from io import StringIO -from pathlib import Path - -from opera.api.openapi.models import ValidationResult - -DEFAULT_TEMPLATE_LOCATION_PATH = '' - - -class CommandArgs(object): - def __init__(self, command_inputs=None, output_format=None): - self.format = output_format - self.uid = str(uuid.uuid4()) - self.timestamp = datetime.now(tz=timezone.utc).isoformat() - self.inputs = '' - if not command_inputs: - return - if not command_inputs.service_template: - raise ValueError('No service template is specified.') - else: - self.csar = Csar(command_inputs.service_template) - - if command_inputs.inputs: - self.inputs = str(command_inputs.inputs) - - -class Csar(object): - def __init__(self, name): - self.name = name - - -class CaptureString(object): - def __enter__(self): - self._stdout = sys.stdout - sys.stdout = self._stringio = StringIO() - return self - - def __exit__(self, *args): - self.value = self._stringio.getvalue() - del self._stringio # free up some memory - sys.stdout = self._stdout - - def get_value(self): - return self._stringio.getvalue() - - -def change_current_dir(): - if DEFAULT_TEMPLATE_LOCATION_PATH != '': - os.chdir(DEFAULT_TEMPLATE_LOCATION_PATH) - - -def get_validation_result(output): - output_list = output.value.splitlines() - if output_list[1] != 'Done.': - return ValidationResult(False, output_list[1]) - else: - return ValidationResult(True) - - -def get_instance_state(): - json_dict = {} - change_current_dir() - for file_path in Path(os.path.join('.opera', 'instances')).glob("*"): - parsed = json.load(open(file_path, 'r')) - component_name = parsed['tosca_name']['data'] - json_dict[component_name] = parsed['state']['data'] - return json_dict