diff --git a/app/routes/definitions.py b/app/routes/definitions.py index cda5244a3..8835a4a91 100644 --- a/app/routes/definitions.py +++ b/app/routes/definitions.py @@ -1,13 +1,9 @@ -import asyncio from typing import List, Optional -from aries_cloudcontroller import ( - CredentialDefinitionSendRequest, - SchemaGetResult, - SchemaSendRequest, -) from fastapi import APIRouter, Depends, HTTPException +import app.services.definitions.credential_definitions as cred_def_service +import app.services.definitions.schemas as schemas_service from app.dependencies.acapy_clients import client_from_auth, get_governance_controller from app.dependencies.auth import ( AcaPyAuth, @@ -17,32 +13,19 @@ acapy_auth_verified, ) from app.dependencies.role import Role -from app.exceptions import ( - CloudApiException, - TrustRegistryException, - handle_acapy_call, - handle_model_with_validation, -) +from app.exceptions import handle_acapy_call +from app.exceptions.cloudapi_exception import CloudApiException from app.models.definitions import ( CreateCredentialDefinition, CreateSchema, CredentialDefinition, CredentialSchema, ) -from app.routes.trust_registry import ( - get_schema_by_id as get_trust_registry_schema_by_id, -) -from app.routes.trust_registry import get_schemas as get_trust_registry_schemas -from app.services import acapy_wallet -from app.services.revocation_registry import wait_for_active_registry -from app.services.trust_registry.schemas import register_schema -from app.services.trust_registry.util.issuer import assert_valid_issuer from app.util.definitions import ( credential_definition_from_acapy, credential_schema_from_acapy, ) -from app.util.retry_method import coroutine_with_retry, coroutine_with_retry_until_value -from shared import ACAPY_ENDORSER_ALIAS, REGISTRY_CREATION_TIMEOUT +from app.util.retry_method import coroutine_with_retry from shared.log_config import get_logger logger = get_logger(__name__) @@ -88,136 +71,12 @@ async def create_schema( bound_logger = logger.bind(body=schema) bound_logger.info("POST request received: Create schema (publish and register)") - schema_send_request = handle_model_with_validation( - logger=bound_logger, - model_class=SchemaSendRequest, - attributes=schema.attribute_names, - schema_name=schema.name, - schema_version=schema.version, - ) async with get_governance_controller(governance_auth) as aries_controller: - try: - bound_logger.info("Publishing schema as governance") - result = await handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.schema.publish_schema, - body=schema_send_request, - create_transaction_for_endorser=False, - ) - except CloudApiException as e: - bound_logger.info( - "An Exception was caught while trying to publish schema: `{}`", - e.detail, - ) - if e.status_code == 400 and "already exist" in e.detail: - bound_logger.info("Handling case of schema already existing on ledger") - bound_logger.debug("Fetching public DID for governance controller") - pub_did = await handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.wallet.get_public_did, - ) - - _schema_id = f"{pub_did.result.did}:2:{schema.name}:{schema.version}" - bound_logger.debug( - "Fetching schema id `{}` which is associated with request", - _schema_id, - ) - _schema: SchemaGetResult = await handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.schema.get_schema, - schema_id=_schema_id, - ) - # Edge case where the governance agent has changed its public did - # Then we need to retrieve the schema in a different way as constructing the schema ID the way above - # will not be correct due to different public did. - if _schema.var_schema is None: - bound_logger.debug( - "Schema not found. Governance agent may have changed public DID. " - "Fetching schemas created by governance agent with request name and version" - ) - schemas_created_ids = await handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.schema.get_created_schemas, - schema_name=schema.name, - schema_version=schema.version, - ) - bound_logger.debug("Getting schemas associated with fetched ids") - schemas: List[SchemaGetResult] = [ - await handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.schema.get_schema, - schema_id=schema_id, - ) - for schema_id in schemas_created_ids.schema_ids - if schema_id - ] - if schemas: - if len(schemas) > 1: - raise CloudApiException( # pylint: disable=W0707 - f"Multiple schemas with name {schema.name} and version {schema.version} exist." - + f"These are: `{str(schemas_created_ids.schema_ids)}`.", - 409, - ) - - bound_logger.debug("Using updated schema id with new DID") - _schema: SchemaGetResult = schemas[0] - else: - # if schema already exists, we should at least fetch 1, so this should never happen - raise CloudApiException( - "Could not publish schema.", 500 - ) # pylint: disable=W0707 - # Schema exists with different attributes - if set(_schema.var_schema.attr_names) != set(schema.attribute_names): - raise CloudApiException( - "Error creating schema: Schema already exists with different attribute names." - + f"Given: `{str(set(_schema.var_schema.attr_names))}`. " - f"Found: `{str(set(schema.attribute_names))}`.", - 409, - ) # pylint: disable=W0707 - - result = credential_schema_from_acapy(_schema.var_schema) - bound_logger.info( - "Schema already exists on ledger. Returning schema definition: `{}`.", - result, - ) - return result - else: - bound_logger.warning( - "An unhandled Exception was caught while publishing schema. The error message is: '{}'.", - e.detail, - ) - raise CloudApiException("Error while creating schema.") from e - - # Register the schema in the trust registry - try: - if result.sent and result.sent.schema_id: - bound_logger.debug("Registering schema after successful publish to ledger") - await register_schema(schema_id=result.sent.schema_id) - else: - bound_logger.error("No SchemaSendResult in `publish_schema` response.") - raise CloudApiException( - "An unexpected error occurred: could not publish schema." - ) - except TrustRegistryException as error: - # If status_code is 405 it means the schema already exists in the trust registry - # That's okay, because we've achieved our intended result: - # make sure the schema is registered in the trust registry - bound_logger.info( - "Caught TrustRegistryException when registering schema. " - "Got status code {} with message `{}`", - error.status_code, - error.detail, + schema_response = await schemas_service.create_schema( + aries_controller=aries_controller, + schema=schema, ) - if error.status_code == 405: - bound_logger.info( - "Status code 405 indicates schema is already registered, so we can continue" - ) - else: - raise error - - result = credential_schema_from_acapy(result.sent.var_schema) - bound_logger.info("Successfully published and registered schema.") - return result + return schema_response @router.get( @@ -268,79 +127,30 @@ async def get_schemas( "schema_version": schema_version, } ) + bound_logger.info("GET request received: Get created schemas") async with client_from_auth(auth) as aries_controller: if not is_governance: # regular tenant is calling endpoint - bound_logger.info("GET request received: Get created schemas") - - if not schema_id: # client is not filtering by schema_id, fetch all - trust_registry_schemas = await get_trust_registry_schemas() - else: # fetch specific id - trust_registry_schemas = [ - await get_trust_registry_schema_by_id(schema_id) - ] - - schema_ids = [schema.id for schema in trust_registry_schemas] - - else: # Governance is calling the endpoint - bound_logger.info( - "GET request received: Get schemas created by governance client" - ) - # Get all created schema ids that match the filter - bound_logger.debug("Fetching created schemas") - response = await handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.schema.get_created_schemas, + schemas = await schemas_service.get_schemas_as_tenant( + aries_controller=aries_controller, schema_id=schema_id, schema_issuer_did=schema_issuer_did, schema_name=schema_name, schema_version=schema_version, ) - # Initiate retrieving all schemas - schema_ids = response.schema_ids or [] - - # We now have schema_ids; the following logic is the same whether called by governance or tenant. - # Now fetch relevant schemas from ledger: - get_schema_futures = [ - handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.schema.get_schema, - schema_id=schema_id, - ) - for schema_id in schema_ids - ] - - # Wait for completion of retrieval and transform all schemas into response model (if a schema was returned) - if get_schema_futures: - bound_logger.debug("Fetching each of the created schemas") - schema_results: List[SchemaGetResult] = await asyncio.gather( - *get_schema_futures - ) - else: - bound_logger.debug("No created schema ids returned") - schema_results = [] - - # Stepping out of the aries_controller context, we can now translate the ACA-Py schema response to our custom model - schemas = [ - credential_schema_from_acapy(schema.var_schema) - for schema in schema_results - if schema.var_schema - ] - - if not is_governance: - # Apply post-filtering that could otherwise only be done in governance aca-py call - # todo: our fetch from trust registry method should be able to pre-filter these values - if schema_issuer_did: - schemas = [ - schema - for schema in schemas - if schema.id.split(":")[0] == schema_issuer_did - ] - if schema_name: - schemas = [schema for schema in schemas if schema.name == schema_name] - if schema_version: - schemas = [schema for schema in schemas if schema.version == schema_version] + else: # Governance is calling the endpoint + try: + schemas = await schemas_service.get_schemas_as_governance( + aries_controller=aries_controller, + schema_id=schema_id, + schema_issuer_did=schema_issuer_did, + schema_name=schema_name, + schema_version=schema_version, + ) + except CloudApiException as e: + bound_logger.error("Failed to get schemas. Error: {}", e) + raise if schemas: bound_logger.info("Successfully fetched schemas.") @@ -443,124 +253,11 @@ async def create_credential_definition( support_revocation = credential_definition.support_revocation async with client_from_auth(auth) as aries_controller: - # Assert the agent has a public did - bound_logger.debug("Asserting client has public DID") - try: - public_did = await acapy_wallet.assert_public_did(aries_controller) - except CloudApiException as e: - log_message = f"Asserting public DID failed: {e}" - - if e.status_code == 403: - bound_logger.info(log_message) - client_error_message = ( - "Wallet making this request has no public DID. " - "Only issuers with a public DID can make this request." - ) - else: - bound_logger.error(log_message) - client_error_message = ( - "Something went wrong while asserting if request is from a valid issuer. " - "Please try again." - ) - - raise CloudApiException(client_error_message, e.status_code) from e - - # Make sure we are allowed to issue this schema according to trust registry rules - bound_logger.debug("Asserting client is a valid issuer") - await assert_valid_issuer(public_did, credential_definition.schema_id) - - if support_revocation: - endorser_connection = await handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.connection.get_connections, - alias=ACAPY_ENDORSER_ALIAS, - ) - has_connections = len(endorser_connection.results) > 0 - - if not has_connections: - bound_logger.error( - "Failed to create credential definition supporting revocation: no endorser connection found. " - "Issuer attempted to create a credential definition with support for revocation but does not " - "have an active connection with an endorser, which is required for this operation." - ) - - raise CloudApiException( - "Credential definition creation failed: An active endorser connection is required " - "to support revocation. Please establish a connection with an endorser and try again." - ) - - bound_logger.debug("Publishing credential definition") - request_body = handle_model_with_validation( - logger=bound_logger, - model_class=CredentialDefinitionSendRequest, - schema_id=credential_definition.schema_id, + credential_definition_id = await cred_def_service.create_credential_definition( + aries_controller=aries_controller, + credential_definition=credential_definition, support_revocation=support_revocation, - tag=credential_definition.tag, - revocation_registry_size=32767, ) - try: - result = await handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.credential_definition.publish_cred_def, - body=request_body, - ) - credential_definition_id = result.sent.credential_definition_id - except CloudApiException as e: - bound_logger.warning( - "An Exception was caught while publishing credential definition: `{}` `{}`", - e.detail, - e.status_code, - ) - if "already exists" in e.detail: - raise CloudApiException(status_code=409, detail=e.detail) from e - else: - raise CloudApiException( - detail=f"Error while creating credential definition: {e.detail}", - status_code=e.status_code, - ) from e - - # Wait for cred_def transaction to be acknowledged - if result.txn and result.txn.transaction_id: - bound_logger.debug( - "The publish credential definition response provides a transaction id. " - "Waiting for transaction to be in state `transaction_acked`" - ) - - try: - # Wait for transaction to be acknowledged and written to the ledger - await coroutine_with_retry_until_value( - coroutine_func=aries_controller.endorse_transaction.get_transaction, - args=(result.txn.transaction_id,), - field_name="state", - expected_value="transaction_acked", - logger=bound_logger, - max_attempts=10, - retry_delay=2, - ) - except asyncio.TimeoutError as e: - raise CloudApiException( - "Timeout waiting for endorser to accept the endorsement request.", - 504, - ) from e - - bound_logger.debug("Transaction has been acknowledged by the endorser") - - # Wait for revocation registry creation - if support_revocation: - try: - bound_logger.debug("Waiting for revocation registry creation") - await asyncio.wait_for( - wait_for_active_registry( - aries_controller, credential_definition_id - ), - timeout=REGISTRY_CREATION_TIMEOUT, - ) - except asyncio.TimeoutError as e: - bound_logger.error("Timeout waiting for revocation registry creation.") - raise CloudApiException( - "Timeout waiting for revocation registry creation.", - 504, - ) from e # ACA-Py only returns the id after creating a credential definition # We want consistent return types across all endpoints, so retrieving the credential @@ -629,46 +326,16 @@ async def get_credential_definitions( # Get all created credential definition ids that match the filter async with client_from_auth(auth) as aries_controller: - bound_logger.debug("Getting created credential definitions") - response = await handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.credential_definition.get_created_cred_defs, + credential_definitions = await cred_def_service.get_credential_definitions( + aries_controller=aries_controller, issuer_did=issuer_did, - cred_def_id=credential_definition_id, + credential_definition_id=credential_definition_id, schema_id=schema_id, schema_issuer_did=schema_issuer_did, schema_name=schema_name, schema_version=schema_version, ) - # Initiate retrieving all credential definitions - credential_definition_ids = response.credential_definition_ids or [] - get_credential_definition_futures = [ - handle_acapy_call( - logger=bound_logger, - acapy_call=aries_controller.credential_definition.get_cred_def, - cred_def_id=credential_definition_id, - ) - for credential_definition_id in credential_definition_ids - ] - - # Wait for completion of retrieval and transform all credential definitions - # into response model (if a credential definition was returned) - if get_credential_definition_futures: - bound_logger.debug("Getting definitions from fetched credential ids") - credential_definition_results = await asyncio.gather( - *get_credential_definition_futures - ) - else: - bound_logger.debug("No definition ids returned") - credential_definition_results = [] - - credential_definitions = [ - credential_definition_from_acapy(credential_definition.credential_definition) - for credential_definition in credential_definition_results - if credential_definition.credential_definition - ] - if credential_definitions: bound_logger.info("Successfully fetched credential definitions.") else: diff --git a/app/services/definitions/__init__.py b/app/services/definitions/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/app/services/definitions/credential_definition_publisher.py b/app/services/definitions/credential_definition_publisher.py new file mode 100644 index 000000000..698d8c2d7 --- /dev/null +++ b/app/services/definitions/credential_definition_publisher.py @@ -0,0 +1,74 @@ +import asyncio +from logging import Logger + +from aries_cloudcontroller import AcaPyClient + +from app.exceptions import CloudApiException, handle_acapy_call +from app.services.revocation_registry import wait_for_active_registry +from app.util.check_endorser_connection import check_endorser_connection +from shared import REGISTRY_CREATION_TIMEOUT + + +class CredentialDefinitionPublisher: + def __init__(self, controller: AcaPyClient, logger: Logger): + self._logger = logger + self._controller = controller + + async def check_endorser_connection(self): + has_connections = await check_endorser_connection( + aries_controller=self._controller + ) + + if not has_connections: + self._logger.error( + "Failed to create credential definition supporting revocation: " + "no endorser connection found. Issuer attempted to create a credential " + "definition with support for revocation but does not have an active " + "connection with an endorser, which is required for this operation." + ) + raise CloudApiException( + "Credential definition creation failed: An active endorser connection " + "is required to support revocation. Please establish a connection with " + "an endorser and try again." + ) + + async def publish_credential_definition(self, request_body): + try: + result = await handle_acapy_call( + logger=self._logger, + acapy_call=self._controller.credential_definition.publish_cred_def, + body=request_body, + ) + except CloudApiException as e: + self._logger.warning( + "An Exception was caught while publishing cred def: `{}` `{}`", + e.detail, + e.status_code, + ) + if "already exists" in e.detail: + self._logger.info("Credential definition already exists") + raise CloudApiException(status_code=409, detail=e.detail) from e + else: + self._logger.error( + "Error while creating credential definition: `{}`", e.detail + ) + raise CloudApiException( + detail=f"Error while creating credential definition: {e.detail}", + status_code=e.status_code, + ) from e + + return result + + async def wait_for_revocation_registry(self, credential_definition_id): + try: + self._logger.debug("Waiting for revocation registry creation") + await asyncio.wait_for( + wait_for_active_registry(self._controller, credential_definition_id), + timeout=REGISTRY_CREATION_TIMEOUT, + ) + except asyncio.TimeoutError as e: + self._logger.error("Timeout waiting for revocation registry creation.") + raise CloudApiException( + "Timeout waiting for revocation registry creation.", + 504, + ) from e diff --git a/app/services/definitions/credential_definitions.py b/app/services/definitions/credential_definitions.py new file mode 100644 index 000000000..21aff6e90 --- /dev/null +++ b/app/services/definitions/credential_definitions.py @@ -0,0 +1,132 @@ +import asyncio +from typing import List, Optional + +from aries_cloudcontroller import AcaPyClient, CredentialDefinitionSendRequest + +from app.exceptions import handle_acapy_call, handle_model_with_validation +from app.models.definitions import CreateCredentialDefinition, CredentialDefinition +from app.services.definitions.credential_definition_publisher import ( + CredentialDefinitionPublisher, +) +from app.services.trust_registry.util.issuer import assert_valid_issuer +from app.util.assert_public_did import assert_public_did +from app.util.definitions import credential_definition_from_acapy +from app.util.transaction_acked import wait_for_transaction_ack +from shared.log_config import get_logger + +logger = get_logger(__name__) + + +async def create_credential_definition( + aries_controller: AcaPyClient, + credential_definition: CreateCredentialDefinition, + support_revocation: bool, +) -> str: + """ + Create a credential definition + """ + bound_logger = logger.bind( + body={ + "schema_id": credential_definition.schema_id, + "tag": credential_definition.tag, + "support_revocation": credential_definition.support_revocation, + } + ) + publisher = CredentialDefinitionPublisher( + controller=aries_controller, logger=bound_logger + ) + + public_did = await assert_public_did(aries_controller) + + await assert_valid_issuer(public_did, credential_definition.schema_id) + + if support_revocation: + await publisher.check_endorser_connection() + + request_body = handle_model_with_validation( + logger=logger, + model_class=CredentialDefinitionSendRequest, + schema_id=credential_definition.schema_id, + support_revocation=support_revocation, + tag=credential_definition.tag, + revocation_registry_size=32767, + ) + + result = await publisher.publish_credential_definition(request_body) + credential_definition_id = result.sent.credential_definition_id + + if result.txn and result.txn.transaction_id: + await wait_for_transaction_ack( + aries_controller=aries_controller, transaction_id=result.txn.transaction_id + ) + + if support_revocation: + await publisher.wait_for_revocation_registry(credential_definition_id) + + return credential_definition_id + + +async def get_credential_definitions( + aries_controller: AcaPyClient, + issuer_did: Optional[str] = None, + credential_definition_id: Optional[str] = None, + schema_id: Optional[str] = None, + schema_issuer_did: Optional[str] = None, + schema_name: Optional[str] = None, + schema_version: Optional[str] = None, +) -> List[CredentialDefinition]: + """ + Get credential definitions + """ + bound_logger = logger.bind( + body={ + "issuer_did": issuer_did, + "credential_definition_id": credential_definition_id, + "schema_id": schema_id, + "schema_issuer_did": schema_issuer_did, + "schema_name": schema_name, + "schema_version": schema_version, + } + ) + bound_logger.debug("Getting created credential definitions") + + response = await handle_acapy_call( + logger=bound_logger, + acapy_call=aries_controller.credential_definition.get_created_cred_defs, + issuer_did=issuer_did, + cred_def_id=credential_definition_id, + schema_id=schema_id, + schema_issuer_did=schema_issuer_did, + schema_name=schema_name, + schema_version=schema_version, + ) + + # Initiate retrieving all credential definitions + credential_definition_ids = response.credential_definition_ids or [] + get_credential_definition_futures = [ + handle_acapy_call( + logger=bound_logger, + acapy_call=aries_controller.credential_definition.get_cred_def, + cred_def_id=credential_definition_id, + ) + for credential_definition_id in credential_definition_ids + ] + + # Wait for completion of retrieval and transform all credential definitions + # into response model (if a credential definition was returned) + if get_credential_definition_futures: + bound_logger.debug("Getting definitions from fetched credential ids") + credential_definition_results = await asyncio.gather( + *get_credential_definition_futures + ) + else: + bound_logger.debug("No definition ids returned") + credential_definition_results = [] + + credential_definitions = [ + credential_definition_from_acapy(credential_definition.credential_definition) + for credential_definition in credential_definition_results + if credential_definition.credential_definition + ] + + return credential_definitions diff --git a/app/services/definitions/schema_publisher.py b/app/services/definitions/schema_publisher.py new file mode 100644 index 000000000..9fa350fa4 --- /dev/null +++ b/app/services/definitions/schema_publisher.py @@ -0,0 +1,126 @@ +from logging import Logger +from typing import List + +from aries_cloudcontroller import ( + AcaPyClient, + SchemaGetResult, + SchemaSendRequest, + TxnOrSchemaSendResult, +) + +from app.exceptions import CloudApiException, handle_acapy_call +from app.models.definitions import CredentialSchema +from app.services.trust_registry.schemas import register_schema +from app.util.definitions import credential_schema_from_acapy + + +class SchemaPublisher: + def __init__(self, controller: AcaPyClient, logger: Logger): + self._logger = logger + self._controller = controller + + async def publish_schema( + self, schema_request: SchemaSendRequest + ) -> TxnOrSchemaSendResult: + try: + result = await handle_acapy_call( + logger=self._logger, + acapy_call=self._controller.schema.publish_schema, + body=schema_request, + create_transaction_for_endorser=False, + ) + except CloudApiException as e: + if "already exist" in e.detail and e.status_code == 400: + result = await self._handle_existing_schema(schema_request) + else: + self._logger.warning( + "An unhandled Exception was caught while publishing schema: {}", + e.detail, + ) + raise CloudApiException("Error while creating schema.") from e + + if result.sent and result.sent.schema_id: + await register_schema(schema_id=result.sent.schema_id) + else: + self._logger.error("No SchemaSendResult in `publish_schema` response.") + raise CloudApiException( + "An unexpected error occurred: could not publish schema." + ) + return result + + async def _handle_existing_schema( + self, schema: SchemaSendRequest + ) -> CredentialSchema: + self._logger.info("Handling case of schema already existing on ledger") + self._logger.debug("Fetching public DID for governance controller") + pub_did = await handle_acapy_call( + logger=self._logger, + acapy_call=self._controller.wallet.get_public_did, + ) + + _schema_id = ( + f"{pub_did.result.did}:2:{schema.schema_name}:{schema.schema_version}" + ) + self._logger.debug( + "Fetching schema id `{}` which is associated with request", + _schema_id, + ) + + _schema: SchemaGetResult = await handle_acapy_call( + logger=self._logger, + acapy_call=self._controller.schema.get_schema, + schema_id=_schema_id, + ) + + # Edge case where the governance agent has changed its public did + # Then we need to retrieve the schema in a different way as constructing + # the schema ID the way above will not be correct due to different public did. + if _schema.var_schema is None: + self._logger.debug( + "Schema not found. Governance agent may have changed public DID. " + "Fetching schemas created by governance with requested name and version" + ) + schemas_created_ids = await handle_acapy_call( + logger=self._logger, + acapy_call=self._controller.schema.get_created_schemas, + schema_name=schema.schema_name, + schema_version=schema.schema_version, + ) + self._logger.debug("Getting schemas associated with fetched ids") + schemas: List[SchemaGetResult] = [ + await handle_acapy_call( + logger=self._logger, + acapy_call=self._controller.schema.get_schema, + schema_id=schema_id, + ) + for schema_id in schemas_created_ids.schema_ids + if schema_id + ] + + if not schemas: + raise CloudApiException("Could not publish schema.", 500) + if len(schemas) > 1: + error_message = ( + f"Multiple schemas with name {schema.schema_name} " + f"and version {schema.schema_version} exist." + f"These are: `{str(schemas_created_ids.schema_ids)}`." + ) + raise CloudApiException(error_message, 409) + self._logger.debug("Using updated schema id with new DID") + _schema: SchemaGetResult = schemas[0] + + # Schema exists with different attributes + if set(_schema.var_schema.attr_names) != set(schema.attributes): + error_message = ( + "Error creating schema: Schema already exists with different attribute " + f"names. Given: `{str(set(schema.attributes))}`. " + f"Found: `{str(set(_schema.var_schema.attr_names))}`." + ) + raise CloudApiException(error_message, 409) + + result = credential_schema_from_acapy(_schema.var_schema) + self._logger.info( + "Schema already exists on ledger. Returning schema definition: `{}`.", + result, + ) + return result diff --git a/app/services/definitions/schemas.py b/app/services/definitions/schemas.py new file mode 100644 index 000000000..9056589a2 --- /dev/null +++ b/app/services/definitions/schemas.py @@ -0,0 +1,187 @@ +import asyncio +from typing import List, Optional + +from aries_cloudcontroller import AcaPyClient, SchemaGetResult, SchemaSendRequest + +from app.exceptions import ( + CloudApiException, + handle_acapy_call, + handle_model_with_validation, +) +from app.models.definitions import CreateSchema, CredentialSchema +from app.routes.trust_registry import ( + get_schema_by_id as get_trust_registry_schema_by_id, +) +from app.routes.trust_registry import get_schemas as get_trust_registry_schemas +from app.services.definitions.schema_publisher import SchemaPublisher +from app.util.definitions import credential_schema_from_acapy +from shared.constants import GOVERNANCE_AGENT_URL +from shared.log_config import get_logger + +logger = get_logger(__name__) + + +async def create_schema( + aries_controller: AcaPyClient, + schema: CreateSchema, +) -> CredentialSchema: + """ + Create a schema and register it in the trust registry + """ + bound_logger = logger.bind(body=schema) + publisher = SchemaPublisher(controller=aries_controller, logger=logger) + + logger.debug("Asserting governance agent is host being called") + if aries_controller.configuration.host != GOVERNANCE_AGENT_URL: + raise CloudApiException( + "Only governance agents are allowed to access this endpoint.", + status_code=403, + ) + + schema_request = handle_model_with_validation( + logger=bound_logger, + model_class=SchemaSendRequest, + attributes=schema.attribute_names, + schema_name=schema.name, + schema_version=schema.version, + ) + + result = await publisher.publish_schema(schema_request) + + result = credential_schema_from_acapy(result.sent.var_schema) + bound_logger.info("Successfully published and registered schema.") + return result + + +async def get_schemas_as_tenant( + aries_controller: AcaPyClient, + schema_id: Optional[str] = None, + schema_issuer_did: Optional[str] = None, + schema_name: Optional[str] = None, + schema_version: Optional[str] = None, +) -> List[CredentialSchema]: + """ + Allows tenants to get all schemas from trust registry + """ + bound_logger = logger.bind( + body={ + "schema_id": schema_id, + "schema_issuer_did": schema_issuer_did, + "schema_name": schema_name, + "schema_version": schema_version, + } + ) + bound_logger.debug("Fetching schemas from trust registry") + + if schema_id: # fetch specific id + trust_registry_schemas = [await get_trust_registry_schema_by_id(schema_id)] + else: # client is not filtering by schema_id, fetch all + trust_registry_schemas = await get_trust_registry_schemas() + + schema_ids = [schema.id for schema in trust_registry_schemas] + + bound_logger.debug("Getting schemas associated with fetched ids") + schemas = await get_schemas_by_id( + aries_controller=aries_controller, + schema_ids=schema_ids, + ) + + if schema_issuer_did: + schemas = [ + schema for schema in schemas if schema.id.split(":")[0] == schema_issuer_did + ] + if schema_name: + schemas = [schema for schema in schemas if schema.name == schema_name] + if schema_version: + schemas = [schema for schema in schemas if schema.version == schema_version] + + return schemas + + +async def get_schemas_as_governance( + aries_controller: AcaPyClient, + schema_id: Optional[str] = None, + schema_issuer_did: Optional[str] = None, + schema_name: Optional[str] = None, + schema_version: Optional[str] = None, +) -> List[CredentialSchema]: + """ + Governance agents gets all schemas created by itself + """ + bound_logger = logger.bind( + body={ + "schema_id": schema_id, + "schema_issuer_did": schema_issuer_did, + "schema_name": schema_name, + "schema_version": schema_version, + } + ) + + logger.debug("Asserting governance agent is host being called") + if aries_controller.configuration.host != GOVERNANCE_AGENT_URL: + raise CloudApiException( + "Only governance agents are allowed to access this endpoint.", + status_code=403, + ) + + # Get all created schema ids that match the filter + bound_logger.debug("Fetching created schemas") + response = await handle_acapy_call( + logger=bound_logger, + acapy_call=aries_controller.schema.get_created_schemas, + schema_id=schema_id, + schema_issuer_did=schema_issuer_did, + schema_name=schema_name, + schema_version=schema_version, + ) + + # Initiate retrieving all schemas + schema_ids = response.schema_ids or [] + + bound_logger.debug("Getting schemas associated with fetched ids") + schemas = await get_schemas_by_id( + aries_controller=aries_controller, + schema_ids=schema_ids, + ) + + return schemas + + +async def get_schemas_by_id( + aries_controller: AcaPyClient, + schema_ids: List[str], +) -> List[CredentialSchema]: + """ + Fetch schemas with attributes using schema IDs. + The following logic applies to both governance and tenant calls. + Retrieve the relevant schemas from the ledger: + """ + logger.debug("Fetching schemas from schema ids") + + get_schema_futures = [ + handle_acapy_call( + logger=logger, + acapy_call=aries_controller.schema.get_schema, + schema_id=schema_id, + ) + for schema_id in schema_ids + ] + + # Wait for completion of futures + if get_schema_futures: + logger.debug("Fetching each of the created schemas") + schema_results: List[SchemaGetResult] = await asyncio.gather( + *get_schema_futures + ) + else: + logger.debug("No created schema ids returned") + schema_results = [] + + # transform all schemas into response model (if schemas returned) + schemas = [ + credential_schema_from_acapy(schema.var_schema) + for schema in schema_results + if schema.var_schema + ] + + return schemas diff --git a/app/services/trust_registry/schemas.py b/app/services/trust_registry/schemas.py index 6af323ed6..916ab874a 100644 --- a/app/services/trust_registry/schemas.py +++ b/app/services/trust_registry/schemas.py @@ -1,5 +1,7 @@ from typing import List, Optional +from fastapi import HTTPException + from app.exceptions import TrustRegistryException from shared.constants import TRUST_REGISTRY_URL from shared.log_config import get_logger @@ -20,21 +22,21 @@ async def register_schema(schema_id: str) -> None: """ bound_logger = logger.bind(body={"schema_id": schema_id}) bound_logger.info("Registering schema on trust registry") - async with RichAsyncClient(raise_status_error=False) as client: - schema_res = await client.post( - f"{TRUST_REGISTRY_URL}/registry/schemas", json={"schema_id": schema_id} - ) - - if schema_res.is_error: - bound_logger.error( - "Error registering schema. Got status code {} with message `{}`.", - schema_res.status_code, - schema_res.text, - ) - raise TrustRegistryException( - f"Error registering schema `{schema_id}`. Error: `{schema_res.text}`.", - schema_res.status_code, - ) + async with RichAsyncClient() as client: + try: + await client.post( + f"{TRUST_REGISTRY_URL}/registry/schemas", json={"schema_id": schema_id} + ) + except HTTPException as e: + bound_logger.error( + "Error registering schema. Got status code {} with message `{}`.", + e.status_code, + e.detail, + ) + raise TrustRegistryException( + f"Error registering schema `{schema_id}`. Error: `{e.detail}`.", + e.status_code, + ) bound_logger.info("Successfully registered schema on trust registry.") @@ -49,18 +51,18 @@ async def fetch_schemas() -> List[Schema]: A list of schemas """ logger.info("Fetching all schemas from trust registry") - async with RichAsyncClient(raise_status_error=False) as client: - schemas_res = await client.get(f"{TRUST_REGISTRY_URL}/registry/schemas") - - if schemas_res.is_error: - logger.error( - "Error fetching schemas. Got status code {} with message `{}`.", - schemas_res.status_code, - schemas_res.text, - ) - raise TrustRegistryException( - f"Unable to fetch schemas: `{schemas_res.text}`.", schemas_res.status_code - ) + async with RichAsyncClient() as client: + try: + schemas_res = await client.get(f"{TRUST_REGISTRY_URL}/registry/schemas") + except HTTPException as e: + logger.error( + "Error fetching schemas. Got status code {} with message `{}`.", + e.status_code, + e.detail, + ) + raise TrustRegistryException( + f"Unable to fetch schemas: `{e.detail}`.", e.status_code + ) result = [Schema.model_validate(schema) for schema in schemas_res.json()] logger.info("Successfully fetched schemas from trust registry.") @@ -79,24 +81,25 @@ async def get_schema_by_id(schema_id: str) -> Optional[Schema]: bound_logger = logger.bind(body={"schema_id": schema_id}) bound_logger.info("Fetching schema from trust registry") - async with RichAsyncClient(raise_status_error=False) as client: - schema_response = await client.get( - f"{TRUST_REGISTRY_URL}/registry/schemas/{schema_id}" - ) - - if schema_response.status_code == 404: - bound_logger.info("Bad request: Schema not found.") - return None - if schema_response.is_error: - logger.error( - "Error fetching schema. Got status code {} with message `{}`.", - schema_response.status_code, - schema_response.text, - ) - raise TrustRegistryException( - f"Unable to fetch schema: `{schema_response.text}`.", - schema_response.status_code, - ) + async with RichAsyncClient() as client: + try: + schema_response = await client.get( + f"{TRUST_REGISTRY_URL}/registry/schemas/{schema_id}" + ) + except HTTPException as e: + if e.status_code == 404: + bound_logger.info("Bad request: Schema not found.") + return None + else: + bound_logger.error( + "Error fetching schema. Got status code {} with message `{}`.", + e.status_code, + e.detail, + ) + raise TrustRegistryException( + f"Unable to fetch schema: `{e.detail}`.", + e.status_code, + ) result = Schema.model_validate(schema_response.json()) logger.info("Successfully fetched schema from trust registry.") @@ -114,20 +117,18 @@ async def remove_schema_by_id(schema_id: str) -> None: """ bound_logger = logger.bind(body={"schema_id": schema_id}) bound_logger.info("Removing schema from trust registry") - async with RichAsyncClient(raise_status_error=False) as client: - remove_response = await client.delete( - f"{TRUST_REGISTRY_URL}/registry/schemas/{schema_id}" - ) - - if remove_response.is_error: - bound_logger.error( - "Error removing schema. Got status code {} with message `{}`.", - remove_response.status_code, - remove_response.text, - ) - raise TrustRegistryException( - f"Error removing schema from trust registry: `{remove_response.text}`.", - remove_response.status_code, - ) + async with RichAsyncClient() as client: + try: + await client.delete(f"{TRUST_REGISTRY_URL}/registry/schemas/{schema_id}") + except HTTPException as e: + bound_logger.error( + "Error removing schema. Got status code {} with message `{}`.", + e.status_code, + e.detail, + ) + raise TrustRegistryException( + f"Error removing schema from trust registry: `{e.detail}`.", + e.status_code, + ) bound_logger.info("Successfully removed schema from trust registry.") diff --git a/app/tests/services/test_trust_registry.py b/app/tests/services/test_trust_registry.py index 4505a1afe..0b0f589e5 100644 --- a/app/tests/services/test_trust_registry.py +++ b/app/tests/services/test_trust_registry.py @@ -250,7 +250,7 @@ async def test_register_schema( json={"schema_id": schema_id}, ) - mock_async_client.post = AsyncMock(return_value=Response(500)) + mock_async_client.post = AsyncMock(side_effect=HTTPException(500)) with pytest.raises(TrustRegistryException): await register_schema(schema_id=schema_id) @@ -319,7 +319,9 @@ async def test_remove_schema_by_id( TRUST_REGISTRY_URL + f"/registry/schemas/{schema_id}" ) - mock_async_client.delete = AsyncMock(return_value=Response(500, text="The error")) + mock_async_client.delete = AsyncMock( + side_effect=HTTPException(status_code=500, detail="The error") + ) with pytest.raises( TrustRegistryException, match="Error removing schema from trust registry" ): diff --git a/app/util/assert_public_did.py b/app/util/assert_public_did.py new file mode 100644 index 000000000..6a43baab1 --- /dev/null +++ b/app/util/assert_public_did.py @@ -0,0 +1,34 @@ +from aries_cloudcontroller import AcaPyClient + +from app.exceptions import CloudApiException +from app.services import acapy_wallet +from shared.log_config import get_logger + +logger = get_logger(__name__) + + +async def assert_public_did(aries_controller: AcaPyClient) -> str: + """ + Assert tenant has a public DID and return it. + """ + try: + logger.debug("Asserting client has public DID") + public_did = await acapy_wallet.assert_public_did(aries_controller) + except CloudApiException as e: + log_message = f"Asserting public DID failed: {e}" + + if e.status_code == 403: + logger.info(log_message) + client_error_message = ( + "Wallet making this request has no public DID. " + "Only issuers with a public DID can make this request." + ) + else: + logger.error(log_message) + client_error_message = ( + "Something went wrong while asserting if request is from a valid " + "issuer. Please try again." + ) + + raise CloudApiException(client_error_message, e.status_code) from e + return public_did diff --git a/app/util/check_endorser_connection.py b/app/util/check_endorser_connection.py new file mode 100644 index 000000000..e00a151a1 --- /dev/null +++ b/app/util/check_endorser_connection.py @@ -0,0 +1,24 @@ +from aries_cloudcontroller import AcaPyClient + +from app.exceptions import handle_acapy_call +from shared import ACAPY_ENDORSER_ALIAS +from shared.log_config import get_logger + +logger = get_logger(__name__) + + +async def check_endorser_connection(aries_controller: AcaPyClient) -> bool: + """ + Check if tenant has an active connection with the endorser. + """ + logger.debug("Get connection by endorser alias") + endorser_connection = await handle_acapy_call( + logger=logger, + acapy_call=aries_controller.connection.get_connections, + alias=ACAPY_ENDORSER_ALIAS, + state="completed", + ) + + has_connections = len(endorser_connection.results) > 0 + + return has_connections diff --git a/app/util/transaction_acked.py b/app/util/transaction_acked.py new file mode 100644 index 000000000..823fbe69d --- /dev/null +++ b/app/util/transaction_acked.py @@ -0,0 +1,36 @@ +import asyncio + +from aries_cloudcontroller import AcaPyClient + +from app.exceptions import CloudApiException +from app.util.retry_method import coroutine_with_retry_until_value +from shared.log_config import get_logger + +logger = get_logger(__name__) + + +async def wait_for_transaction_ack( + aries_controller: AcaPyClient, transaction_id: str +) -> None: + """ + Wait for the transaction to be acknowledged by the endorser. + """ + bound_logger = logger.bind(transaction_id=transaction_id) + bound_logger.debug("Waiting for transaction to be acknowledged by the endorser") + try: + # Wait for transaction to be acknowledged and written to the ledger + await coroutine_with_retry_until_value( + coroutine_func=aries_controller.endorse_transaction.get_transaction, + args=(transaction_id,), + field_name="state", + expected_value="transaction_acked", + logger=bound_logger, + max_attempts=10, + retry_delay=2, + ) + except asyncio.TimeoutError as e: + raise CloudApiException( + "Timeout waiting for endorser to accept the endorsement request.", + 504, + ) from e + bound_logger.debug("Transaction has been acknowledged by the endorser") diff --git a/trustregistry/registry/registry_schemas.py b/trustregistry/registry/registry_schemas.py index 868755476..fc2c7936c 100644 --- a/trustregistry/registry/registry_schemas.py +++ b/trustregistry/registry/registry_schemas.py @@ -46,7 +46,7 @@ async def register_schema( ) except crud.SchemaAlreadyExistsException as e: bound_logger.info("Bad request: Schema already exists.") - raise HTTPException(status_code=405, detail="Schema already exists.") from e + raise HTTPException(status_code=409, detail="Schema already exists.") from e return create_schema_res diff --git a/trustregistry/tests/e2e/test_schema.py b/trustregistry/tests/e2e/test_schema.py index f5c486bb3..814202e3b 100644 --- a/trustregistry/tests/e2e/test_schema.py +++ b/trustregistry/tests/e2e/test_schema.py @@ -45,7 +45,7 @@ async def test_register_schema(): f"{TRUST_REGISTRY_URL}/registry/schemas", json=payload, ) - assert response.status_code == 405 + assert response.status_code == 409 assert "Schema already exists" in response.json()["detail"] diff --git a/trustregistry/tests/test_registry_schema.py b/trustregistry/tests/test_registry_schema.py index aa7fd15a3..b7a45f582 100644 --- a/trustregistry/tests/test_registry_schema.py +++ b/trustregistry/tests/test_registry_schema.py @@ -57,7 +57,7 @@ async def test_register_schema_x(): await registry_schemas.register_schema(schema_id) mock_crud.assert_called_once() - assert ex.value.status_code == 405 + assert ex.value.status_code == 409 @pytest.mark.anyio