-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathschema_publisher.py
126 lines (113 loc) · 5 KB
/
schema_publisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from logging import Logger
from typing import List
from aries_cloudcontroller import (
AcaPyClient,
SchemaGetResult,
SchemaSendRequest,
TxnOrSchemaSendResult,
)
from app.exceptions import CloudApiException, handle_acapy_call
from app.models.definitions import CredentialSchema
from app.services.trust_registry.schemas import register_schema
from app.util.definitions import credential_schema_from_acapy
class SchemaPublisher:
def __init__(self, controller: AcaPyClient, logger: Logger):
self._logger = logger
self._controller = controller
async def publish_schema(
self, schema_request: SchemaSendRequest
) -> TxnOrSchemaSendResult:
try:
result = await handle_acapy_call(
logger=self._logger,
acapy_call=self._controller.schema.publish_schema,
body=schema_request,
create_transaction_for_endorser=False,
)
except CloudApiException as e:
if "already exist" in e.detail and e.status_code == 400:
result = await self._handle_existing_schema(schema_request)
else:
self._logger.warning(
"An unhandled Exception was caught while publishing schema: {}",
e.detail,
)
raise CloudApiException("Error while creating schema.") from e
if result.sent and result.sent.schema_id:
await register_schema(schema_id=result.sent.schema_id)
else:
self._logger.error("No SchemaSendResult in `publish_schema` response.")
raise CloudApiException(
"An unexpected error occurred: could not publish schema."
)
return result
async def _handle_existing_schema(
self, schema: SchemaSendRequest
) -> CredentialSchema:
self._logger.info("Handling case of schema already existing on ledger")
self._logger.debug("Fetching public DID for governance controller")
pub_did = await handle_acapy_call(
logger=self._logger,
acapy_call=self._controller.wallet.get_public_did,
)
_schema_id = (
f"{pub_did.result.did}:2:{schema.schema_name}:{schema.schema_version}"
)
self._logger.debug(
"Fetching schema id `{}` which is associated with request",
_schema_id,
)
_schema: SchemaGetResult = await handle_acapy_call(
logger=self._logger,
acapy_call=self._controller.schema.get_schema,
schema_id=_schema_id,
)
# Edge case where the governance agent has changed its public did
# Then we need to retrieve the schema in a different way as constructing
# the schema ID the way above will not be correct due to different public did.
if _schema.var_schema is None:
self._logger.debug(
"Schema not found. Governance agent may have changed public DID. "
"Fetching schemas created by governance with requested name and version"
)
schemas_created_ids = await handle_acapy_call(
logger=self._logger,
acapy_call=self._controller.schema.get_created_schemas,
schema_name=schema.schema_name,
schema_version=schema.schema_version,
)
self._logger.debug("Getting schemas associated with fetched ids")
schemas: List[SchemaGetResult] = [
await handle_acapy_call(
logger=self._logger,
acapy_call=self._controller.schema.get_schema,
schema_id=schema_id,
)
for schema_id in schemas_created_ids.schema_ids
if schema_id
]
if not schemas:
raise CloudApiException("Could not publish schema.", 500)
if len(schemas) > 1:
error_message = (
f"Multiple schemas with name {schema.schema_name} "
f"and version {schema.schema_version} exist."
f"These are: `{str(schemas_created_ids.schema_ids)}`."
)
raise CloudApiException(error_message, 409)
self._logger.debug("Using updated schema id with new DID")
_schema: SchemaGetResult = schemas[0]
# Schema exists with different attributes
if set(_schema.var_schema.attr_names) != set(schema.attributes):
error_message = (
"Error creating schema: Schema already exists with different attribute "
f"names. Given: `{str(set(schema.attributes))}`. "
f"Found: `{str(set(_schema.var_schema.attr_names))}`."
)
raise CloudApiException(error_message, 409)
result = credential_schema_from_acapy(_schema.var_schema)
self._logger.info(
"Schema already exists on ledger. Returning schema definition: `{}`.",
result,
)
return result