Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interceptor #67

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added scripts/keri/cf/scripts.json
Empty file.
67 changes: 53 additions & 14 deletions src/keria/app/agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from keri.vdr import verifying
from keri.vdr.credentialing import Regery
from keri.vdr.eventing import Tevery
from keri.app import httping as khttping

from . import aiding, notifying, indirecting, credentialing
from .specing import AgentSpecResource
Expand All @@ -45,10 +46,10 @@
logger = ogler.getLogger()


def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=None, configDir=None):
def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=None, configDir=None,interceptor_webhook=None, interceptor_headers=None):
""" Set up an ahab in Signify mode """

agency = Agency(name=name, base=base, bran=bran, configFile=configFile, configDir=configDir)
agency = Agency(name=name, base=base, bran=bran, configFile=configFile, configDir=configDir,interceptor_webhook=interceptor_webhook, interceptor_headers=interceptor_headers)
bootApp = falcon.App(middleware=falcon.CORSMiddleware(
allow_origins='*', allow_credentials='*',
expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input',
Expand Down Expand Up @@ -107,14 +108,16 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No


class Agency(doing.DoDoer):
def __init__(self, name, bran, base="", configFile=None, configDir=None, adb=None, temp=False):
def __init__(self, name, bran, base="", configFile=None, configDir=None, adb=None, temp=False,interceptor_webhook=None, interceptor_headers=None):
self.name = name
self.base = base
self.bran = bran
self.temp = temp
self.configFile = configFile
self.configDir = configDir
self.cf = None
self.metrics = decking.Deck()
self.interceptor = InterceptorDoer(interceptor_webhook, interceptor_headers, cues=self.metrics)
if self.configFile is not None: # Load config file if creating database
self.cf = configing.Configer(name=self.configFile,
base="",
Expand Down Expand Up @@ -252,6 +255,7 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts):
self.anchors = decking.Deck()
self.witners = decking.Deck()
self.queries = decking.Deck()
self.agency.metrics

receiptor = agenting.Receiptor(hby=hby)
self.postman = forwarding.Poster(hby=hby)
Expand Down Expand Up @@ -308,15 +312,15 @@ def __init__(self, hby, rgy, agentHab, agency, caid, **opts):
vry=self.verifier)

doers.extend([
Initer(agentHab=agentHab, caid=caid),
Initer(agentHab=agentHab, caid=caid, metrics = self.agency.metrics),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the list of doers, you have to include self.interceptor in this list to get the recur method to be called as a background process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I not put it when I Initialize the agency super(Agency, self).__init__(doers=[self.interceptor], always=True) on Line 132 instead of having a logger per agent, I was thinking that since we have a queue per agency instead of agent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pfeairheller I also tried passing self.agency.interceptor but it still not running. Does Interceptor need to be Just a Doer Since Agent is a DoDoer?

Querier(hby=hby, agentHab=agentHab, kvy=self.kvy, queries=self.queries),
Escrower(kvy=self.kvy, rgy=self.rgy, rvy=self.rvy, tvy=self.tvy, exc=self.exc, vry=self.verifier,
registrar=self.registrar, credentialer=self.credentialer),
Messager(kvy=self.kvy, parser=self.parser),
Witnesser(receiptor=receiptor, witners=self.witners),
Delegator(agentHab=agentHab, swain=self.swain, anchors=self.anchors),
Witnesser(receiptor=receiptor, witners=self.witners, metrics=self.agency.metrics),
Delegator(agentHab=agentHab, swain=self.swain, anchors=self.anchors, metrics=self.agency.metrics),
GroupRequester(hby=hby, agentHab=agentHab, postman=self.postman, counselor=self.counselor,
groups=self.groups),
groups=self.groups, metrics=self.agency.metrics),
])

super(Agent, self).__init__(doers=doers, always=True, **opts)
Expand Down Expand Up @@ -349,6 +353,30 @@ def inceptExtern(self, pre, verfers, digers, **kwargs):

self.agency.incept(self.caid, pre)

class InterceptorDoer(doing.DoDoer):

def __init__(self, webhook, headers, cues=None):
self.webhook = webhook
self.headers = headers
self.cues = cues if cues is not None else decking.Deck()
self.clienter = khttping.Clienter()
super(InterceptorDoer, self).__init__(doers=[self.clienter], always=True)

def recur(self, tyme, deeds=None):
if self.cues:
msg = self.cues.popleft()
# TODO: Sent the message somewhere
client = self.clienter.request("POST", self.webhook, body=msg, headers=self.headers)
while not client.responses:
yield self.tock

rep = client.respond()

self.clienter.remove(client)
return rep.status == 200

return super(InterceptorDoer, self).recur(tyme, deeds)


class Messager(doing.Doer):

Expand All @@ -366,68 +394,79 @@ def recur(self, tyme=None):

class Witnesser(doing.Doer):

def __init__(self, receiptor, witners):
def __init__(self, receiptor, witners, metrics):
self.receiptor = receiptor
self.witners = witners
self.metrics = metrics
super(Witnesser, self).__init__()

def recur(self, tyme=None):
while True:
if self.witners:
msg = self.witners.popleft()
serder = msg["serder"]

if self.interceptor:
data = serder.pretty()
self.interceptor.push(data)
# If we are a rotation event, may need to catch new witnesses up to current key state
if serder.ked['t'] in (Ilks.rot, Ilks.drt):
adds = serder.ked["ba"]
for wit in adds:
yield from self.receiptor.catchup(serder.pre, wit)

yield from self.receiptor.receipt(serder.pre, serder.sn)
self.metrics.append(dict(evt="witnessing", data=dict(aid=serder.pre)))

yield self.tock


class Delegator(doing.Doer):

def __init__(self, agentHab, swain, anchors):
def __init__(self, agentHab, swain, anchors, metrics):
self.agentHab = agentHab
self.swain = swain
self.anchors = anchors
self.metrics = metrics
super(Delegator, self).__init__()

def recur(self, tyme=None):
if self.anchors:
msg = self.anchors.popleft()
if self.interceptor:
self.interceptor.push(msg)
sn = msg["sn"] if "sn" in msg else None
self.swain.delegation(pre=msg["pre"], sn=sn, proxy=self.agentHab)
self.metrics.append(dict(msg))

return False


class Initer(doing.Doer):
def __init__(self, agentHab, caid):
def __init__(self, agentHab, caid, metrics):
self.agentHab = agentHab
self.caid = caid
self.metrics = metrics
super(Initer, self).__init__()

def recur(self, tyme):
""" Prints Agent name and prefix """
if not self.agentHab.inited:
return False

self.metrics.append(dict(evt="init", data=dict(aid=self.agentHab.pre)))
print(" Agent:", self.agentHab.pre, " Controller:", self.caid)

return True


class GroupRequester(doing.Doer):

def __init__(self, hby, agentHab, postman, counselor, groups):
def __init__(self, hby, agentHab, postman, counselor, groups, metrics):
self.hby = hby
self.agentHab = agentHab
self.postman = postman
self.counselor = counselor
self.groups = groups
self.metrics = metrics

super(GroupRequester, self).__init__()

Expand All @@ -437,7 +476,7 @@ def recur(self, tyme):
msg = self.groups.popleft()
serder = msg["serder"]
sigers = msg["sigers"]

self.metrics.append(dict(evt="group", data=dict(msg)))
ghab = self.hby.habs[serder.pre]
if "smids" in msg:
smids = msg['smids']
Expand Down
21 changes: 18 additions & 3 deletions src/keria/app/cli/commands/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
import argparse
import logging
import json

from keri import __version__
from keri import help
Expand Down Expand Up @@ -54,6 +55,16 @@
action="store",
default=None,
help="directory override for configuration data")
parser.add_argument("--interceptor-webhook",
dest="interceptor_webhook",
action="store",
default=None,
help="webhook to send intercepted messages")
parser.add_argument("--interceptor-headers",
dest="interceptor_headers",
action="store",
default=None,
help="headers to send with intercepted messages")


def launch(args):
Expand All @@ -72,14 +83,16 @@ def launch(args):
http=int(args.http),
boot=int(args.boot),
configFile=args.configFile,
configDir=args.configDir)
configDir=args.configDir,
interceptor_webhook=args.interceptor_webhook,
interceptor_headers=json.loads(args.interceptor_headers) if args.interceptor_headers else {'Content-Type': 'application/json'})

logger.info("******* Ended Agent for %s listening: admin/%s, http/%s"
".******", args.name, args.admin, args.http)


def runAgent(name="ahab", base="", bran="", admin=3901, http=3902, boot=3903, configFile=None,
configDir=None, expire=0.0):
configDir=None, expire=0.0, interceptor_webhook = None, interceptor_headers = None):
"""
Setup and run one witness
"""
Expand All @@ -90,6 +103,8 @@ def runAgent(name="ahab", base="", bran="", admin=3901, http=3902, boot=3903, co
httpPort=http,
bootPort=boot,
configFile=configFile,
configDir=configDir))
configDir=configDir,
interceptor_webhook=interceptor_webhook,
interceptor_headers=interceptor_headers))

directing.runController(doers=doers, expire=expire)
1 change: 0 additions & 1 deletion tests/app/test_agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def test_witnesser(helpers):
deeds = doist.enter(doers=[wr])
doist.recur(deeds)


def test_keystate_ends(helpers):
caid = "ELI7pg979AdhmvrjDeam2eAO2SR5niCgnjAJXJHtJose"
salt = b'0123456789abcdef'
Expand Down