diff --git a/CHANGES.rst b/CHANGES.rst index 8bd04e26e..cc11fa570 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,7 +12,9 @@ Changes Changes: -------- -- No change. +- Add ``GET /providers/{provider_id}/processes/{process_id}/package`` endpoint that allows retrieval of the `CWL` + `Application Package` definition generated for the specific `Provider`'s `Process` definition. +- Add `CLI` ``package`` operation to request the remote `Provider` or local `Process` `CWL` `Application Package`. Fixes: ------ diff --git a/weaver/cli.py b/weaver/cli.py index a908da28c..19174a775 100644 --- a/weaver/cli.py +++ b/weaver/cli.py @@ -7,6 +7,7 @@ import os import re import sys +import textwrap import time from typing import TYPE_CHECKING from urllib.parse import urlparse @@ -14,6 +15,7 @@ import yaml from pyramid.httpexceptions import HTTPNotImplemented from requests.auth import AuthBase, HTTPBasicAuth +from requests.sessions import Session from requests.structures import CaseInsensitiveDict from webob.headers import ResponseHeaders from yaml.scanner import ScannerError @@ -22,7 +24,7 @@ from weaver.datatype import AutoBase from weaver.exceptions import PackageRegistrationError from weaver.execute import ExecuteMode, ExecuteResponse, ExecuteTransmissionMode -from weaver.formats import ContentType, OutputFormat, get_content_type, get_format +from weaver.formats import ContentType, OutputFormat, get_content_type, get_format, repr_json from weaver.processes.constants import ProcessSchema from weaver.processes.convert import ( convert_input_values_schema, @@ -348,7 +350,6 @@ def __init__(self, url=None, auth=None): def _request(self, method, # type: AnyRequestMethod url, # type: str - *args, # type: Any headers=None, # type: Optional[AnyHeadersContainer] x_headers=None, # type: Optional[AnyHeadersContainer] request_timeout=None, # type: Optional[int] @@ -370,7 +371,20 @@ def _request(self, if isinstance(request_retries, int) and request_retries > 0: kwargs.setdefault("retries", request_retries) - return request_extra(method, url, *args, headers=headers, **kwargs) + if LOGGER.isEnabledFor(logging.DEBUG): + fields = set(inspect.signature(Session.request).parameters) - {"params", "url", "method", "json", "body"} + options = {opt: val for opt, val in kwargs.items() if opt in fields} + tab = " " + LOGGER.debug( + f"Request:\n{tab}%s %s\n{tab}Queries:\n%s\n{tab}Headers:\n%s\n{tab}Content:\n%s\n{tab}Options:\n%s", + method, + url, + textwrap.indent(repr_json(kwargs.get("params") or {}, indent=len(tab)), tab * 2), + textwrap.indent(repr_json(headers or {}, indent=len(tab)), tab * 2), + textwrap.indent(repr_json(kwargs.get("json") or kwargs.get("body") or {}, indent=len(tab)), tab * 2), + textwrap.indent(repr_json(options, indent=len(tab)), tab * 2), + ) + return request_extra(method, url, headers=headers, **kwargs) def _get_url(self, url): # type: (Optional[str]) -> str @@ -538,6 +552,9 @@ def register(self, """ Registers a remote :term:`Provider` using specified references. + .. note:: + This operation is specific to `Weaver`. It is not supported by standard :term:`OGC API - Processes`. + :param provider_id: Identifier to employ for registering the new :term:`Provider`. :param provider_url: Endpoint location to register the new remote :term:`Provider`. :param url: Instance URL if not already provided during client creation. @@ -576,6 +593,9 @@ def unregister(self, """ Unregisters a remote :term:`Provider` using the specified identifier. + .. note:: + This operation is specific to `Weaver`. It is not supported by standard :term:`OGC API - Processes`. + :param provider_id: Identifier to employ for unregistering the :term:`Provider`. :param url: Instance URL if not already provided during client creation. :param auth: @@ -629,8 +649,13 @@ def deploy(self, If the reference is resolved to be a :term:`Workflow`, all its underlying :term:`Process` steps must be available under the same URL that this client was initialized with. + .. note:: + This is only supported by :term:`OGC API - Processes` instances that support + the `Deploy, Replace, Undeploy` (DRU) extension. + .. seealso:: - :ref:`proc_op_deploy` + - :ref:`proc_op_deploy` + - |ogc-api-proc-part2|_ :param process_id: Desired process identifier. @@ -856,6 +881,48 @@ def _get_process_url(self, url, process_id, provider_id=None): path = f"{base}/processes/{process_id}" return path + def package(self, + process_id, # type: str + provider_id=None, # type: Optional[str] + url=None, # type: Optional[str] + auth=None, # type: Optional[AuthHandler] + headers=None, # type: Optional[AnyHeadersContainer] + with_links=True, # type: bool + with_headers=False, # type: bool + request_timeout=None, # type: Optional[int] + request_retries=None, # type: Optional[int] + output_format=None, # type: Optional[AnyOutputFormat] + ): # type: (...) -> OperationResult + """ + Retrieve the :term:`Application Package` definition of the specified :term:`Process`. + + .. note:: + This operation is specific to `Weaver`. It is not supported by standard :term:`OGC API - Processes`. + + :param process_id: Identifier of the local or remote process to describe. + :param provider_id: Identifier of the provider from which to locate a remote process to describe. + :param url: Instance URL if not already provided during client creation. + :param auth: + Instance authentication handler if not already created during client creation. + Should perform required adjustments to request to allow access control of protected contents. + :param headers: + Additional headers to employ when sending request. + Note that this can break functionalities if expected headers are overridden. Use with care. + :param with_links: Indicate if ``links`` section should be preserved in returned result body. + :param with_headers: Indicate if response headers should be returned in result output. + :param request_timeout: Maximum timout duration (seconds) to wait for a response when performing HTTP requests. + :param request_retries: Amount of attempt to retry HTTP requests in case of failure. + :param output_format: Select an alternate output representation of the result body contents. + :returns: Results of the operation. + """ + path = self._get_process_url(url, process_id, provider_id) + path = f"{path}/package" + resp = self._request("GET", path, + headers=self._headers, x_headers=headers, settings=self._settings, auth=auth, + request_timeout=request_timeout, request_retries=request_retries) + return self._parse_result(resp, message="Retrieving process Application Package.", + output_format=output_format, with_links=with_links, with_headers=with_headers) + @staticmethod def _parse_inputs(inputs): # type: (Optional[Union[str, JSON]]) -> Union[OperationResult, ExecutionInputsMap] @@ -2365,6 +2432,17 @@ def make_parser(): help="Representation schema of the returned process description (default: %(default)s, case-insensitive)." ) + op_package = WeaverArgumentParser( + "package", + description="Obtain the Application Package definition of an existing process.", + formatter_class=ParagraphFormatter, + ) + set_parser_sections(op_package) + add_url_param(op_package) + add_shared_options(op_package) + add_process_param(op_package) + add_provider_param(op_package, required=False) + op_execute = WeaverArgumentParser( "execute", description="Submit a job execution for an existing process.", @@ -2606,6 +2684,7 @@ def make_parser(): op_unregister, op_capabilities, op_describe, + op_package, op_execute, op_jobs, op_monitor, diff --git a/weaver/wps_restapi/providers/__init__.py b/weaver/wps_restapi/providers/__init__.py index eb2e1bd58..d10277cc4 100644 --- a/weaver/wps_restapi/providers/__init__.py +++ b/weaver/wps_restapi/providers/__init__.py @@ -19,6 +19,7 @@ def includeme(config): config.add_route(**sd.service_api_route_info(sd.provider_service, settings)) config.add_route(**sd.service_api_route_info(sd.provider_processes_service, settings)) config.add_route(**sd.service_api_route_info(sd.provider_process_service, settings)) + config.add_route(**sd.service_api_route_info(sd.provider_process_package_service, settings)) config.add_route(**sd.service_api_route_info(sd.provider_execution_service, settings)) config.add_view(p.get_providers, route_name=sd.providers_service.name, request_method="GET", renderer=OutputFormat.JSON) @@ -32,6 +33,8 @@ def includeme(config): request_method="GET", renderer=OutputFormat.JSON) config.add_view(p.get_provider_process, route_name=sd.provider_process_service.name, request_method="GET", renderer=OutputFormat.JSON) + config.add_view(p.get_provider_process_package, route_name=sd.provider_process_package_service.name, + request_method="GET", renderer=OutputFormat.JSON) config.add_view(p.submit_provider_job, route_name=sd.provider_jobs_service.name, request_method="POST", renderer=OutputFormat.JSON) config.add_view(p.submit_provider_job, route_name=sd.provider_execution_service.name, diff --git a/weaver/wps_restapi/providers/providers.py b/weaver/wps_restapi/providers/providers.py index ccc9637d9..8e2427d52 100644 --- a/weaver/wps_restapi/providers/providers.py +++ b/weaver/wps_restapi/providers/providers.py @@ -149,7 +149,7 @@ def get_provider(request): return HTTPOk(json=data) -@sd.provider_processes_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_PROVIDERS, sd.TAG_GETCAPABILITIES], +@sd.provider_processes_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_GETCAPABILITIES], renderer=OutputFormat.JSON, schema=sd.ProviderProcessesEndpoint(), response_schemas=sd.get_provider_processes_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) @@ -190,7 +190,7 @@ def describe_provider_process(request): return Process.convert(process, service, get_settings(request)) -@sd.provider_process_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_PROVIDERS, sd.TAG_DESCRIBEPROCESS], +@sd.provider_process_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OutputFormat.JSON, schema=sd.ProviderProcessEndpoint(), response_schemas=sd.get_provider_process_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) @@ -199,7 +199,7 @@ def describe_provider_process(request): def get_provider_process(request): # type: (PyramidRequest) -> AnyViewResponse """ - Retrieve a process description (DescribeProcess). + Retrieve a remote provider's process description (DescribeProcess). """ process = describe_provider_process(request) schema = request.params.get("schema") @@ -207,10 +207,24 @@ def get_provider_process(request): return HTTPOk(json=offering) -@sd.provider_execution_service.post(tags=[sd.TAG_PROVIDERS, sd.TAG_PROVIDERS, sd.TAG_EXECUTE, sd.TAG_JOBS], +@sd.provider_process_package_service.get(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], + renderer=OutputFormat.JSON, schema=sd.ProviderProcessPackageEndpoint(), + response_schemas=sd.get_provider_process_package_responses) +@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) +@check_provider_requirements +def get_provider_process_package(request): + # type: (PyramidRequest) -> AnyViewResponse + """ + Retrieve a remote provider's process Application Package definition. + """ + process = describe_provider_process(request) + return HTTPOk(json=process.package or {}) + + +@sd.provider_execution_service.post(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_EXECUTE, sd.TAG_JOBS], renderer=OutputFormat.JSON, schema=sd.PostProviderProcessJobRequest(), response_schemas=sd.post_provider_process_job_responses) -@sd.provider_jobs_service.post(tags=[sd.TAG_PROVIDERS, sd.TAG_PROVIDERS, sd.TAG_EXECUTE, sd.TAG_JOBS], +@sd.provider_jobs_service.post(tags=[sd.TAG_PROVIDERS, sd.TAG_PROCESSES, sd.TAG_EXECUTE, sd.TAG_JOBS], renderer=OutputFormat.JSON, schema=sd.PostProviderProcessJobRequest(), response_schemas=sd.post_provider_process_job_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index c55d9730f..bb1706e5d 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -311,6 +311,7 @@ provider_service = Service(name="provider", path=f"{providers_service.path}/{{provider_id}}") provider_processes_service = Service(name="provider_processes", path=provider_service.path + processes_service.path) provider_process_service = Service(name="provider_process", path=provider_service.path + process_service.path) +provider_process_package_service = Service(name="provider_process_pkg", path=f"{provider_process_service.path}/package") provider_jobs_service = Service(name="provider_jobs", path=provider_service.path + process_jobs_service.path) provider_job_service = Service(name="provider_job", path=provider_service.path + process_job_service.path) provider_results_service = Service(name="provider_results", path=provider_service.path + process_results_service.path) @@ -2796,6 +2797,10 @@ class ProcessPackageEndpoint(LocalProcessPath): querystring = LocalProcessQuery() +class ProviderProcessPackageEndpoint(ProviderProcessPath, ProcessPackageEndpoint): + pass + + class ProcessPayloadEndpoint(LocalProcessPath): header = RequestHeaders() querystring = LocalProcessQuery() @@ -6633,6 +6638,10 @@ class GoneVaultFileDownloadResponse(ExtendedMappingSchema): "405": MethodNotAllowedErrorResponseSchema(), "500": InternalServerErrorResponseSchema(), } +get_provider_process_package_responses = copy(get_process_package_responses) +get_provider_process_package_responses.update({ + "403": ForbiddenProviderAccessResponseSchema(), +}) post_provider_responses = { "201": CreatedPostProvider(description="success"), "400": ExtendedMappingSchema(description=OWSMissingParameterValue.description),