Skip to content

Commit

Permalink
A complete rewrite of the executors, using the library.
Browse files Browse the repository at this point in the history
This just simulated CLI before, which was brittle and broke
with each minor change. There's a new library API now that
ensures things work as expected, and are cleaner.

Also concurrency changes to be multiprocess with a pool.
  • Loading branch information
sstanovnik committed Aug 5, 2020
1 parent 7f5df68 commit bea4a65
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 281 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ With Docker:

```shell script
docker-compose up --build
curl localhost:8080
docker cp test.csar xopera-api_api_1:/app/
docker exec xopera-api_api_1 unzip test.csar
# prepare request inputs: service_template, inputs (in JSON object form, not a string)
curl -XPOST localhost:8080/validate -H "Content-Type: application/json" -d @inputs-request.json
curl -XPOST localhost:8080/deploy -H "Content-Type: application/json" -d @inputs-request.json
curl localhost:8080/status
curl localhost:8080/outputs
curl -XPOST localhost:8080/undeploy
```

With a local development installation:
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
build: .
environment:
DEBUG: "true"
LOG_LEVEL: debug
labels:
- "traefik.enable=true"
- "traefik.http.services.api.loadbalancer.server.port=8080"
Expand Down
34 changes: 18 additions & 16 deletions openapi-spec.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
openapi: "3.0.0"
info:
version: 1.0.0
version: 0.1.1
title: xOpera API
license:
name: Apache-2.0
Expand All @@ -10,7 +10,7 @@ paths:
summary: Deploy a CSAR
operationId: deploy
requestBody:
description: Deployment inputs and service template name
description: Deployment inputs and service template name.
required: false
content:
application/json:
Expand Down Expand Up @@ -68,7 +68,7 @@ paths:
schema:
$ref: "#/components/schemas/Invocation"
"404":
description: No invocation with this id.
description: No invocation with this id.
/outputs:
get:
summary: Fetch deployment outputs
Expand All @@ -87,7 +87,7 @@ paths:
summary: Validate a CSAR
operationId: validate
requestBody:
description: Validation inputs and service template name
description: Validation inputs and service template name.
required: false
content:
application/json:
Expand Down Expand Up @@ -118,39 +118,35 @@ components:
description: Free-form mapping of outputs.
type: object
Invocation:
description: An invocation of the deployment
description: An invocation of the deployment.
type: object
required:
- id
- state
- timestamp
- console_output
properties:
id:
type: string
state:
$ref: "#/components/schemas/InvocationState"
operation:
type: string
enum:
- deploy
- undeploy
type: OperationType
timestamp:
description: An ISO8601 timestamp of the invocation.
type: string
inputs:
description: Inputs provided for invocation
description: Inputs provided for invocation.
type: object
instance_state:
description: State of the instances defined in service template
description: State of the instances defined in service template.
type: object
additionalProperties:
type: string
type: string
exception:
description: An internal xOpera error that occurred starting operation
description: An internal xOpera error that occurred starting operation.
type: string
console_output:
description: xOpera console output for operation
description: xOpera console output for operation.
type: string
InvocationHistory:
description: Invocation history ordered by timestamp ascending.
Expand All @@ -170,6 +166,12 @@ components:
InvocationState:
type: string
enum:
- pending
- in_progress
- success
- failed
- in_progress
OperationType:
type: string
enum:
- deploy
- undeploy
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# first commit that supports PEP420
opera==0.5.8
# first commit with the new decoupled API
git+git://github.com/xlab-si/xopera-opera.git@d8aa1d09e6b352854ae24f9f54f57c01f65a7add#egg=opera

connexion >= 2.6.0; python_version>="3.6"
connexion >= 2.3.0; python_version=="3.5"
Expand Down
184 changes: 184 additions & 0 deletions src/opera/api/controllers/background_invocation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import datetime
import json
import multiprocessing
import os
import sys
import typing
import uuid
from io import StringIO
from pathlib import Path
from typing import List, Callable, Optional

from opera.commands.deploy import deploy as opera_deploy
from opera.commands.undeploy import undeploy as opera_undeploy
from opera.storage import Storage

from opera.api.log import get_logger
from opera.api.openapi.models import Invocation, InvocationState

logger = get_logger(__name__)


def get_instance_state():
json_dict = {}
for file_path in Path(os.path.join('.opera', 'instances')).glob("*"):
parsed = json.load(open(file_path, 'r'))
component_name = parsed['tosca_name']['data']
json_dict[component_name] = parsed['state']['data']
return json_dict


class StdoutCapture(object):
def __enter__(self):
self._stdout_backup = sys.stdout
self._stringio = StringIO()
sys.stdout = self._stringio
return self

def __exit__(self, *args):
self.value = self._stringio.getvalue()
del self._stringio # free up some memory
sys.stdout = self._stdout_backup

def get_value(self):
return self._stringio.getvalue()


class WrapperException(BaseException):
def __init__(self, invocation_uuid, wrapped_exception):
self.invocation_uuid = invocation_uuid
self.wrapped_exception = wrapped_exception


def wrapper_start(function, function_args, invocation_uuid):
logger.debug("Starting %s", invocation_uuid)

local_inv = InvocationService.load_invocation(invocation_uuid)
local_inv.state = InvocationState.IN_PROGRESS
InvocationService.write_invocation(local_inv)

with StdoutCapture() as capture:
try:
function(*function_args)
# we want the console output no matter what
except BaseException as e:
wrapped_exc = WrapperException(invocation_uuid, e)
raise wrapped_exc
finally:
local_inv = InvocationService.load_invocation(invocation_uuid)
local_inv.console_output = capture.get_value()
InvocationService.write_invocation(local_inv)

return invocation_uuid


def wrapper_error(error: WrapperException):
if not isinstance(error, WrapperException):
logger.error("Unexpected out-of-band error.")
raise error

logger.error("Error in %s", error.invocation_uuid, exc_info=error.wrapped_exception)

local_inv = InvocationService.load_invocation(error.invocation_uuid)
local_inv.state = InvocationState.FAILED
local_inv.exception = str(error)
InvocationService.write_invocation(local_inv)


# gets param as the result of wrapper_start
def wrapper_done(invocation_uuid):
logger.debug("Done with %s", invocation_uuid)

local_inv = InvocationService.load_invocation(invocation_uuid)
local_inv.state = InvocationState.SUCCESS
local_inv.instance_state = get_instance_state()
InvocationService.write_invocation(local_inv)


# necessary because we can't pickle the storage object and therefore can't submit upstream deploy to the pool
def opera_deploy_storage_proxy(service_template: str, inputs: typing.Optional[dict], num_workers: int):
opera_storage = Storage.create()
return opera_deploy(service_template, inputs, opera_storage, num_workers)


def opera_undeploy_storage_proxy(num_workers: int):
opera_storage = Storage.create()
opera_undeploy(opera_storage, num_workers)


class InvocationService:
def __init__(self):
# FIXME: should really be closed or used as a context manager
self.pool = multiprocessing.Pool(1) # one thing at a time

def invoke(self, function: Callable, function_args: list,
operation_type: str, inputs: Optional[dict]) -> Invocation:
invocation_uuid = str(uuid.uuid4())
now = datetime.datetime.now(tz=datetime.timezone.utc)
logger.info("Invoking %s with ID %s at %s", operation_type, invocation_uuid, now.isoformat())

inv = Invocation()
inv.id = invocation_uuid
inv.state = InvocationState.PENDING
inv.operation = operation_type
inv.timestamp = now.isoformat()
inv.inputs = inputs
inv.instance_state = None
inv.exception = None
inv.console_output = None
self.write_invocation(inv)

wrapper_kwargs = dict(
function=function,
function_args=function_args,
invocation_uuid=invocation_uuid
)

# the error callback is runtime correct, as we only throw one type of exception
# noinspection PyTypeChecker
self.pool.apply_async(wrapper_start, kwds=wrapper_kwargs, callback=wrapper_done, error_callback=wrapper_error)
return inv

@classmethod
def invocation_history(cls) -> List[Invocation]:
logger.info("Loading invocation history.")

invocations = []
for file_path in Path(".opera-api").glob('*.json'):
logger.debug(file_path)
invocation = Invocation.from_dict(json.load(open(file_path, 'r')))
invocations.append(invocation)

if invocations:
invocations.sort(
key=lambda x: datetime.datetime.strptime(
x.timestamp,
'%Y-%m-%dT%H:%M:%S.%f+00:00'
),
reverse=True
)

return invocations

@classmethod
def latest_invocation(cls) -> Optional[Invocation]:
all_invocations = cls.invocation_history()
try:
return next(all_invocations)
except StopIteration:
return None

@classmethod
def load_invocation(cls, eye_dee: str) -> Optional[Invocation]:
all_invocations = cls.invocation_history()
try:
return next(inv for inv in all_invocations if inv.id == eye_dee)
except StopIteration:
return None

@classmethod
def write_invocation(cls, inv: Invocation):
storage = Storage.create(".opera-api")
filename = "invocation-{}.json".format(inv.id)
dump = json.dumps(inv.to_dict())
storage.write(dump, filename)
Loading

0 comments on commit bea4a65

Please sign in to comment.