Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: shut down agents gracefully #1

Merged
merged 4 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion scripts/keri/cf/demo-witness-oobis.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"iurls": [
"http://127.0.0.1:5642/oobi/BBilc4-L3tFUnfM_wJr4S4OJanAv_VmF_dJNN6vkf2Ha/controller?name=Wan&tag=witness",
"http://127.0.0.1:5643/oobi/BLskRTInXnMxWaGqcpSyMgo0nYbalW99cGZESrz3zapM/controller?name=Wes&tag=witness",
"http://127.0.0.1:5644/oobi/BIKKuvBwpmDVA4Ds-EpL5bt9OqPzWPja2LigFYZN2YfX/controller?name=Wil&tag=witness"
"http://127.0.0.1:5644/oobi/BIKKuvBwpmDVA4Ds-EpL5bt9OqPzWPja2LigFYZN2YfX/controller?name=Wil&tag=witness",
"http://127.0.0.1:5645/oobi/BM35JN8XeJSEfpxopjn5jr7tAHCE5749f0OobhMLCorE/controller?name=Wit&tag=witness",
"http://127.0.0.1:5646/oobi/BIj15u5V11bkbtAxMA7gcNJZcax-7TgaBMLsQnMHpYHP/controller?name=Wub&tag=witness",
"http://127.0.0.1:5647/oobi/BF2rZTW79z4IXocYRQnjjsOuvFUQv-ptCf8Yltd7PfsM/controller?name=Wyz&tag=witness"
]
}
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
tests_require=[
'coverage>=5.5',
'pytest>=6.2.4',
'requests==2.32.3'
],
setup_requires=[
],
Expand Down
192 changes: 154 additions & 38 deletions src/keria/app/agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@
keria.app.agenting module

"""
import logging
import os
from base64 import b64decode
import json
import os
import datetime
from dataclasses import asdict
from dataclasses import asdict, dataclass, field
from typing import List
from urllib.parse import urlparse, urljoin
from types import MappingProxyType

import falcon
import lmdb
from falcon import media
from hio.base import doing
from hio.base import doing, Doer
from hio.core import http, tcp
from hio.help import decking

from keri import kering
from keri import core
from keri import core, kering, help
from keri.app.notifying import Notifier
from keri.app.storing import Mailboxer

Expand All @@ -43,6 +45,7 @@

from . import aiding, notifying, indirecting, credentialing, ipexing, delegating
from . import grouping as keriagrouping
from .serving import GracefulShutdownDoer
from ..peer import exchanging as keriaexchanging
from .specing import AgentSpecResource
from ..core import authing, longrunning, httping
Expand All @@ -52,23 +55,135 @@

logger = ogler.getLogger()


def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=None, configDir=None,
keypath=None, certpath=None, cafilepath=None, cors=False, releaseTimeout=None, curls=None,
iurls=None, durls=None, bootUsername=None, bootPassword=None):
""" Set up an ahab in Signify mode """

agency = Agency(name=name, base=base, bran=bran, configFile=configFile, configDir=configDir, releaseTimeout=releaseTimeout, curls=curls, iurls=iurls, durls=durls)
@dataclass
class KERIAServerConfig:
"""
Provides a dataclass to define server config so it is easy to test with multiprocess.
Dataclasses are Pickleable and can be passed to a new process.
"""
# HTTP ports to use.
# Admin port number the admin HTTP server listens on.
# Default is 3901. KERIA_ADMIN_PORT also sets this
adminPort: int = 3901
# Local port number the HTTP server listens on.
# Default is 3902. KERIA_HTTP_PORT also sets this
httpPort: int | None = 3092
# Boot port number the Boot HTTP server listens on.
# WARNING: This port needs to be secured.
# Default is 3903. KERIA_BOOT_PORT also sets this.
bootPort: int = 3903

# Agency master controller information and configuration
# Name of controller. Default is 'keria'.
name: str = "keria"
# additional optional prefix to file location of KERI keystore
base: str = ""
# 21 character encryption passcode for keystore (is not saved)
bran: str = None
# configuration filename
configFile: str = "keria"
# directory override for configuration data
configDir: str = None

# TLS key material
# TLS server private key file
keyPath: str = None
# TLS server signed certificate (public key) file
certPath: str = None
# TLS server CA certificate chain
caFilePath: str = None

# Logging configuration
# Set log level to DEBUG | INFO | WARNING | ERROR | CRITICAL.
# Default is CRITICAL
logLevel: str = "CRITICAL"
# path of the log file. If not defined, logs will not be written to the file.
logFile: str = None

# Agency configuration
# Use CORS headers in the HTTP responses. Default is False
cors: bool = True
# Timeout for releasing agents. Default is 86400 seconds (24 hours)
releaseTimeout: int = 86400
# Controller Service Endpoint Location OOBI URLs to resolve at startup of each Agent. Makes a 'controller' EndRole and LocScheme in the database for each URL
curls: List[str] = field(default_factory=list)
# General Introduction OOBI URLs to resolve at startup of each Agent. For things like witnesses, watchers, mailboxes, and TEL observers.
iurls: List[str] = field(default_factory=list)
# Data OOBI URLs resolved at startup of each Agent. For things like ACDC schemas, ACDCs (credentials), or other CESR streams.
durls: List[str] = field(default_factory=list)

# Experimental configuration
# Experimental password for boot endpoint. Enables HTTP Basic Authentication for the boot endpoint. Only meant to be used for testing purposes.
bootPassword: str = None
# Experimental username for boot endpoint. Enables HTTP Basic Authentication for the boot endpoint. Only meant to be used for testing purposes.
bootUsername: str = None

def runAgency(config: KERIAServerConfig):
"""Runs a KERIA Agency with the given Doers by calling Doist.do(). Useful for testing."""
help.ogler.level = logging.getLevelName(config.logLevel)
logger.setLevel(help.ogler.level)
if config.logFile is not None:
help.ogler.headDirPath = config.logFile
help.ogler.reopen(name=config.name, temp=False, clear=True)

logger.info("Starting Agent for %s listening: admin/%s, http/%s, boot/%s",
config.name, config.adminPort, config.httpPort, config.bootPort)
logger.info("PID: %s", os.getpid())

doist = agencyDoist(setupDoers(config))
doist.do()

def agencyDoist(doers: List[Doer]):
"""Creates a Doist for the Agency doers and adds a graceful shutdown handler. Useful for testing."""
tock = 0.03125
doist = doing.Doist(limit=0.0, tock=tock, real=True)
doers.append(GracefulShutdownDoer(doist=doist, agency=getAgency(doers)))
doist.doers = doers
return doist

def getAgency(doers):
"""Get the agency from a list of Doers. Used to get the Agency for the graceful agent shutdown."""
for doer in doers:
if isinstance(doer, Agency):
return doer
return None

def setupDoers(config: KERIAServerConfig):
"""
Sets up the HIO coroutines the KERIA agent server is composed of including three HTTP servers for a KERIA agent server:
1. Boot server for bootstrapping agents. Signify calls this with a signed inception event.
2. Admin server for administrative tasks like creating agents.
3. HTTP server for all other agent operations.
"""
agency = Agency(
name=config.name,
base=config.base,
bran=config.bran,
configFile=config.configFile,
configDir=config.configDir,
releaseTimeout=config.releaseTimeout,
curls=config.curls,
iurls=config.iurls,
durls=config.durls
)
allowed_cors_headers = [
'cesr-attachment',
'cesr-date',
'content-type',
'signature',
'signature-input',
'signify-resource',
'signify-timestamp'
]
bootApp = falcon.App(middleware=falcon.CORSMiddleware(
allow_origins='*', allow_credentials='*',
expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input',
'signify-resource', 'signify-timestamp']))
expose_headers=allowed_cors_headers))

bootServer = createHttpServer(bootPort, bootApp, keypath, certpath, cafilepath)
bootServer = createHttpServer(config.bootPort, bootApp, config.keyPath, config.certPath, config.caFilePath)
if not bootServer.reopen():
raise RuntimeError(f"cannot create boot http server on port {bootPort}")
raise RuntimeError(f"Cannot create boot HTTP server on port {config.bootPort}")
bootServerDoer = http.ServerDoer(server=bootServer)
bootEnd = BootEnd(agency, username=bootUsername, password=bootPassword)
bootEnd = BootEnd(agency, username=config.bootUsername, password=config.bootPassword)
bootApp.add_route("/boot", bootEnd)
bootApp.add_route("/health", HealthEnd())

Expand All @@ -77,17 +192,16 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No

app = falcon.App(middleware=falcon.CORSMiddleware(
allow_origins='*', allow_credentials='*',
expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input',
'signify-resource', 'signify-timestamp']))
if cors:
expose_headers=allowed_cors_headers))
if config.cors:
app.add_middleware(middleware=httping.HandleCORS())
app.add_middleware(authing.SignatureValidationComponent(agency=agency, authn=authn, allowed=["/agent"]))
app.req_options.media_handlers.update(media.Handlers())
app.resp_options.media_handlers.update(media.Handlers())

adminServer = createHttpServer(adminPort, app, keypath, certpath, cafilepath)
adminServer = createHttpServer(config.adminPort, app, config.keyPath, config.certPath, config.caFilePath)
if not adminServer.reopen():
raise RuntimeError(f"cannot create admin http server on port {adminPort}")
raise RuntimeError(f"cannot create admin HTTP server on port {config.adminPort}")
adminServerDoer = http.ServerDoer(server=adminServer)

doers = [agency, bootServerDoer, adminServerDoer]
Expand All @@ -100,20 +214,19 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No
keriaexchanging.loadEnds(app=app)
ipexing.loadEnds(app=app)

if httpPort:
if config.httpPort:
happ = falcon.App(middleware=falcon.CORSMiddleware(
allow_origins='*', allow_credentials='*',
expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input',
'signify-resource', 'signify-timestamp']))
expose_headers=allowed_cors_headers))
happ.req_options.media_handlers.update(media.Handlers())
happ.resp_options.media_handlers.update(media.Handlers())

ending.loadEnds(agency=agency, app=happ)
indirecting.loadEnds(agency=agency, app=happ)

server = createHttpServer(httpPort, happ, keypath, certpath, cafilepath)
server = createHttpServer(config.httpPort, happ, config.keyPath, config.certPath, config.caFilePath)
if not server.reopen():
raise RuntimeError(f"cannot create local http server on port {httpPort}")
raise RuntimeError(f"cannot create local http server on port {config.httpPort}")
httpServerDoer = http.ServerDoer(server=server)
doers.append(httpServerDoer)

Expand All @@ -124,7 +237,7 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No
specEnd.addRoutes(happ)
happ.add_route("/spec.yaml", specEnd)

print("The Agency is loaded and waiting for requests...")
logger.info("The Agency is loaded and waiting for requests...")
return doers


Expand Down Expand Up @@ -249,19 +362,22 @@ def delete(self, agent):
del self.agents[agent.caid]

def shut(self, agent):
logger.info(f"closing idle agent {agent.caid}")
logger.info(f"Shutting down agent {agent.caid}")
agent.remove(agent.doers)
self.remove([agent])
del self.agents[agent.caid]
agent.hby.ks.close(clear=False)
agent.seeker.close(clear=False)
agent.exnseeker.close(clear=False)
agent.monitor.opr.close(clear=False)
agent.notifier.noter.close(clear=False)
agent.rep.mbx.close(clear=False)
agent.registrar.rgy.close()
agent.mgr.rb.close(clear=False)
agent.hby.close(clear=False)
try:
agent.hby.ks.close(clear=False)
agent.seeker.close(clear=False)
agent.exnseeker.close(clear=False)
agent.monitor.opr.close(clear=False)
agent.notifier.noter.close(clear=False)
agent.rep.mbx.close(clear=False)
agent.registrar.rgy.close()
agent.mgr.rb.close(clear=False)
agent.hby.close(clear=False)
except lmdb.Error as ex: # Sometimes LMDB will throw an error if the DB is already closed
logger.error(f"Error closing databases for agent {agent.caid}: {ex}")

def get(self, caid):
if caid in self.agents:
Expand Down
3 changes: 1 addition & 2 deletions src/keria/app/cli/commands/sig-fix.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
"""
import argparse

from hio import help
from hio.base import doing
from keri import kering
from keri import help, kering
from keri.app import habbing
from keri.app.cli.common import existing
from keri.core import serdering, coring
Expand Down
Loading