Skip to content

Commit

Permalink
Add support for long running operations to the IPEX endpoints so mutl…
Browse files Browse the repository at this point in the history
…sig AIDs can track their completion. (#168)

Signed-off-by: pfeairheller <[email protected]>
  • Loading branch information
pfeairheller authored Jan 15, 2024
1 parent 781ea7a commit d8cb7fc
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 28 deletions.
2 changes: 1 addition & 1 deletion images/keria.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ENV PATH=/keria/venv/bin:${PATH}
RUN pip install --upgrade pip

# Copy in Python dependency files
COPY requirements.txt setup.py .
COPY requirements.txt setup.py ./
# "src/" dir required for installation of dependencies with setup.py
RUN mkdir /keria/src
# Install Python dependencies
Expand Down
4 changes: 2 additions & 2 deletions src/keria/app/agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,6 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts):
registrar=self.registrar, verifier=self.verifier,
notifier=self.notifier)

self.monitor = longrunning.Monitor(hby=hby, swain=self.swain, counselor=self.counselor, temp=hby.temp,
registrar=self.registrar, credentialer=self.credentialer)
self.seeker = basing.Seeker(name=hby.name, db=hby.db, reger=self.rgy.reger, reopen=True, temp=self.hby.temp)
self.exnseeker = basing.ExnSeeker(name=hby.name, db=hby.db, reopen=True, temp=self.hby.temp)

Expand All @@ -337,6 +335,8 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts):
self.exc = exchanging.Exchanger(hby=hby, handlers=handlers)
grouping.loadHandlers(exc=self.exc, mux=self.mux)
protocoling.loadHandlers(hby=self.hby, exc=self.exc, notifier=self.notifier)
self.monitor = longrunning.Monitor(hby=hby, swain=self.swain, counselor=self.counselor, temp=hby.temp,
registrar=self.registrar, credentialer=self.credentialer, exchanger=self.exc)

self.rvy = routing.Revery(db=hby.db, cues=self.cues)
self.kvy = eventing.Kevery(db=hby.db,
Expand Down
34 changes: 23 additions & 11 deletions src/keria/app/ipexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from keri.core import coring, eventing, serdering
from keri.peer import exchanging

from keria.core import httping
from keria.core import httping, longrunning


def loadEnds(app):
Expand Down Expand Up @@ -60,12 +60,14 @@ def on_post(req, rep, name):

match route:
case "/ipex/admit":
IpexAdmitCollectionEnd.sendAdmit(agent, hab, ked, sigs, rec)
op = IpexAdmitCollectionEnd.sendAdmit(agent, hab, ked, sigs, rec)
case "/multisig/exn":
IpexAdmitCollectionEnd.sendMultisigExn(agent, hab, ked, sigs, atc, rec)
op = IpexAdmitCollectionEnd.sendMultisigExn(agent, hab, ked, sigs, atc, rec)
case _:
raise falcon.HTTPBadRequest(description=f"invalid message route {route}")

rep.status = falcon.HTTP_202
rep.data = json.dumps(ked).encode("utf-8")
rep.status = falcon.HTTP_200
rep.data = op.to_json().encode("utf-8")

@staticmethod
def sendAdmit(agent, hab, ked, sigs, rec):
Expand All @@ -92,6 +94,8 @@ def sendAdmit(agent, hab, ked, sigs, rec):
agent.exchanges.append(dict(said=serder.said, pre=hab.pre, rec=rec, topic='credential'))
agent.admits.append(dict(said=ked['d'], pre=hab.pre))

return agent.monitor.submit(serder.pre, longrunning.OpTypes.exchange, metadata=dict(said=serder.said))

@staticmethod
def sendMultisigExn(agent, hab, ked, sigs, atc, rec):
if not isinstance(hab, habbing.SignifyGroupHab):
Expand Down Expand Up @@ -143,6 +147,8 @@ def sendMultisigExn(agent, hab, ked, sigs, atc, rec):
agent.exchanges.append(dict(said=serder.said, pre=hab.pre, rec=[issr], topic="credential"))
agent.admits.append(dict(said=admitked['d'], pre=hab.pre))

return agent.monitor.submit(serder.pre, longrunning.OpTypes.exchange, metadata=dict(said=serder.said))


class IpexGrantCollectionEnd:

Expand Down Expand Up @@ -181,12 +187,14 @@ def on_post(req, rep, name):

match route:
case "/ipex/grant":
IpexGrantCollectionEnd.sendGrant(agent, hab, ked, sigs, atc, rec)
op = IpexGrantCollectionEnd.sendGrant(agent, hab, ked, sigs, atc, rec)
case "/multisig/exn":
IpexGrantCollectionEnd.sendMultisigExn(agent, hab, ked, sigs, atc, rec)
op = IpexGrantCollectionEnd.sendMultisigExn(agent, hab, ked, sigs, atc, rec)
case _:
raise falcon.HTTPBadRequest(description=f"invalid route {route}")

rep.status = falcon.HTTP_202
rep.data = json.dumps(ked).encode("utf-8")
rep.status = falcon.HTTP_200
rep.data = op.to_json().encode("utf-8")

@staticmethod
def sendGrant(agent, hab, ked, sigs, atc, rec):
Expand Down Expand Up @@ -214,6 +222,8 @@ def sendGrant(agent, hab, ked, sigs, atc, rec):
agent.exchanges.append(dict(said=serder.said, pre=hab.pre, rec=rec, topic='credential'))
agent.grants.append(dict(said=ked['d'], pre=hab.pre, rec=rec))

return agent.monitor.submit(serder.pre, longrunning.OpTypes.exchange, metadata=dict(said=serder.said))

@staticmethod
def sendMultisigExn(agent, hab, ked, sigs, atc, rec):
if not isinstance(hab, habbing.SignifyGroupHab):
Expand Down Expand Up @@ -252,5 +262,7 @@ def sendMultisigExn(agent, hab, ked, sigs, atc, rec):
serder = serdering.SerderKERI(sad=grant)
ims = bytearray(serder.raw) + pathed['exn']
agent.hby.psr.parseOne(ims=ims)
agent.exchanges.append(dict(said=serder.said, pre=hab.pre, rec=holder, topic="credential"))
agent.grants.append(dict(said=grant['d'], pre=hab.pre, rec=holder))
agent.exchanges.append(dict(said=serder.said, pre=hab.pre, rec=[holder], topic="credential"))
agent.grants.append(dict(said=grant['d'], pre=hab.pre, rec=[holder]))

return agent.monitor.submit(serder.pre, longrunning.OpTypes.exchange, metadata=dict(said=serder.said))
22 changes: 19 additions & 3 deletions src/keria/core/longrunning.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
from keri.help import helping

# long running operationt types
Typeage = namedtuple("Tierage", 'oobi witness delegation group query registry credential endrole challenge done')
Typeage = namedtuple("Tierage", 'oobi witness delegation group query registry credential endrole challenge exchange '
'done')

OpTypes = Typeage(oobi="oobi", witness='witness', delegation='delegation', group='group', query='query',
registry='registry', credential='credential', endrole='endrole', challenge='challenge', done='done')
registry='registry', credential='credential', endrole='endrole', challenge='challenge',
exchange='exchange', done='done')


@dataclass_json
Expand Down Expand Up @@ -90,7 +92,8 @@ class Monitor:
"""

def __init__(self, hby, swain, counselor=None, registrar=None, credentialer=None, opr=None, temp=False):
def __init__(self, hby, swain, counselor=None, registrar=None, exchanger=None, credentialer=None, opr=None,
temp=False):
""" Create long running operation monitor
Parameters:
Expand All @@ -103,6 +106,7 @@ def __init__(self, hby, swain, counselor=None, registrar=None, credentialer=None
self.swain = swain
self.counselor = counselor
self.registrar = registrar
self.exchanger = exchanger
self.credentialer = credentialer
self.opr = opr if opr is not None else Operator(name=hby.name, temp=temp)

Expand Down Expand Up @@ -333,6 +337,18 @@ def status(self, op):
else:
operation.done = False

elif op.type in (OpTypes.exchange,):
if "said" not in op.metadata:
raise kering.ValidationError(
f"invalid long running {op.type} operation, metadata missing 'said' field")

said = op.metadata["said"]
if self.exchanger.complete(said):
operation.done = True
operation.response = dict(said=said)
else:
operation.done = False

elif op.type in (OpTypes.endrole, ):
if "cid" not in op.metadata or "role" not in op.metadata or "eid" not in op.metadata:
raise kering.ValidationError(
Expand Down
18 changes: 11 additions & 7 deletions tests/app/test_ipexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def test_ipex_admit(helpers, mockHelpingNowIso8601):
data = json.dumps(body).encode("utf-8")
res = client.simulate_post(path="/identifiers/test/ipex/admit", body=data)

assert res.status_code == 202
assert res.status_code == 200
assert len(agent.exchanges) == 1
assert len(agent.admits) == 1

Expand Down Expand Up @@ -297,8 +297,12 @@ def test_ipex_grant(helpers, mockHelpingNowIso8601, seeder):

data = json.dumps(body).encode("utf-8")
res = client.simulate_post(path="/identifiers/legal-entity/ipex/grant", body=data)
assert res.status_code == 202
assert res.json == exn.ked
assert res.status_code == 200
assert res.json == {'done': False,
'error': None,
'metadata': {'said': 'EHwjDEsub6XT19ISLft1m1xMNvVXnSfH0IsDGllox4Y8'},
'name': 'exchange.EFnYGvF_ENKJ_4PGsWsvfd_R6m5cN-3KYsz_0mAuNpCm',
'response': None}
assert len(agent.exchanges) == 1
assert len(agent.grants) == 1

Expand Down Expand Up @@ -731,7 +735,7 @@ def test_multisig_grant_admit(seeder, helpers):
data = json.dumps(body).encode("utf-8")
res = client0.simulate_post(path="/identifiers/issuer/ipex/grant", body=data)

assert res.status_code == 202
assert res.status_code == 200

# Package up the GRANT into a multisig/exn from participant 1 to send to participant 0
multiExnSerder, end = exchanging.exchange(route="/multisig/exn",
Expand All @@ -749,7 +753,7 @@ def test_multisig_grant_admit(seeder, helpers):

data = json.dumps(body).encode("utf-8")
res = client1.simulate_post(path="/identifiers/issuer/ipex/grant", body=data)
assert res.status_code == 202
assert res.status_code == 200

# Wait until the GRANT has been persisted by Agent0
while agent0.exc.complete(said=grantSerder.said) is not True:
Expand Down Expand Up @@ -803,7 +807,7 @@ def test_multisig_grant_admit(seeder, helpers):
data = json.dumps(body).encode("utf-8")
res = hclient0.simulate_post(path="/identifiers/holder/ipex/admit", body=data)

assert res.status_code == 202
assert res.status_code == 200

# Package up the ADMIT into a multisig/exn from participant 1 to send to participant 0
multiExnSerder, end = exchanging.exchange(route="/multisig/exn",
Expand All @@ -821,7 +825,7 @@ def test_multisig_grant_admit(seeder, helpers):

data = json.dumps(body).encode("utf-8")
res = hclient1.simulate_post(path="/identifiers/holder/ipex/admit", body=data)
assert res.status_code == 202
assert res.status_code == 200

# Wait until the ADMIT has been persisted by Hagent0
while hagent0.exc.complete(said=admitSerder.said) is not True:
Expand Down
20 changes: 16 additions & 4 deletions tests/core/test_longrunning.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_operations(helpers):

salt = b"C6X8UfJqYrOmJQHKqnI5a"
op = helpers.createAid(client, "user1", salt)
assert op["done"] == True
assert op["done"] is True
assert op["name"] == "done.EAF7geUfHm-M5lA-PI6Jv-4708a-KknnlMlA7U1_Wduv"
aid = op["response"]
recp = aid['i']
Expand Down Expand Up @@ -57,7 +57,7 @@ def test_operations(helpers):
res = client.simulate_post(
path=f"/identifiers/user1/endroles", json=body)
op = res.json
assert op["done"] == True
assert op["done"] is True
assert op["name"] == "endrole.EAF7geUfHm-M5lA-PI6Jv-4708a-KknnlMlA7U1_Wduv.agent.EI7AkI40M11MS7lkTCb10JC9-nDt-tXwQh44OHAFlv_9"

# operations has 2 elements
Expand All @@ -78,7 +78,7 @@ def test_operations(helpers):

salt = b"tRkaivxZkQPfqjlDY6j1K"
op = helpers.createAid(client, "user2", salt)
assert op["done"] == True
assert op["done"] is True
assert op["name"] == "done.EAyXphfc0qOLqEDAe0cCYCj-ovbSaEFgVgX6MrC_b5ZO"
aid = op["response"]
recp = aid['i']
Expand Down Expand Up @@ -106,7 +106,7 @@ def test_operations(helpers):
res = client.simulate_post(
path=f"/identifiers/user2/endroles", json=body)
op = res.json
assert op["done"] == True
assert op["done"] is True
assert op["name"] == "endrole.EAyXphfc0qOLqEDAe0cCYCj-ovbSaEFgVgX6MrC_b5ZO.agent.EI7AkI40M11MS7lkTCb10JC9-nDt-tXwQh44OHAFlv_9"

# operations has 4 elements
Expand Down Expand Up @@ -184,3 +184,15 @@ def test_error(helpers):

res = client.simulate_get(path=f"/operations/{op['name']}")
assert res.status_code == 500

# Test other error conditions
res = client.simulate_get(path=f"/operations/exchange.EBfdlu8R27Fbx-ehrqwImnK-8Cm79sqbAQ4MmvEAYqao")
assert res.status_code == 404
assert res.json == {
'title': "long running operation 'exchange.EBfdlu8R27Fbx-ehrqwImnK-8Cm79sqbAQ4MmvEAYqao' not found"
}

res = client.simulate_get(path=f"/operations/query.EBfdlu8R27Fbx-ehrqwImnK-8Cm79sqbAQ4MmvEAYqao")
assert res.status_code == 404
assert res.json == {'title': 'long running operation '
"'query.EBfdlu8R27Fbx-ehrqwImnK-8Cm79sqbAQ4MmvEAYqao' not found"}

0 comments on commit d8cb7fc

Please sign in to comment.