diff --git a/.github/workflows/python-app-ci.yml b/.github/workflows/python-app-ci.yml index 34b39b8c..a53b5351 100644 --- a/.github/workflows/python-app-ci.yml +++ b/.github/workflows/python-app-ci.yml @@ -28,7 +28,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 pytest hio + pip install flake8 pytest hio requests if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - name: Lint changes run: | @@ -51,7 +51,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install pytest pytest-cov hio + pip install pytest pytest-cov hio requests if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - name: Run core KERIA tests run: | diff --git a/scripts/keri/cf/demo-witness-oobis.json b/scripts/keri/cf/demo-witness-oobis.json index 10d91fd0..c003eab5 100755 --- a/scripts/keri/cf/demo-witness-oobis.json +++ b/scripts/keri/cf/demo-witness-oobis.json @@ -7,6 +7,9 @@ "iurls": [ "http://127.0.0.1:5642/oobi/BBilc4-L3tFUnfM_wJr4S4OJanAv_VmF_dJNN6vkf2Ha/controller?name=Wan&tag=witness", "http://127.0.0.1:5643/oobi/BLskRTInXnMxWaGqcpSyMgo0nYbalW99cGZESrz3zapM/controller?name=Wes&tag=witness", - "http://127.0.0.1:5644/oobi/BIKKuvBwpmDVA4Ds-EpL5bt9OqPzWPja2LigFYZN2YfX/controller?name=Wil&tag=witness" + "http://127.0.0.1:5644/oobi/BIKKuvBwpmDVA4Ds-EpL5bt9OqPzWPja2LigFYZN2YfX/controller?name=Wil&tag=witness", + "http://127.0.0.1:5645/oobi/BM35JN8XeJSEfpxopjn5jr7tAHCE5749f0OobhMLCorE/controller?name=Wit&tag=witness", + "http://127.0.0.1:5646/oobi/BIj15u5V11bkbtAxMA7gcNJZcax-7TgaBMLsQnMHpYHP/controller?name=Wub&tag=witness", + "http://127.0.0.1:5647/oobi/BF2rZTW79z4IXocYRQnjjsOuvFUQv-ptCf8Yltd7PfsM/controller?name=Wyz&tag=witness" ] } \ No newline at end of file diff --git a/setup.py b/setup.py index 3ffc026d..c116f55c 100644 --- a/setup.py +++ b/setup.py @@ -92,6 +92,7 @@ tests_require=[ 'coverage>=5.5', 'pytest>=6.2.4', + 'requests==2.32.3' ], setup_requires=[ ], diff --git a/src/keria/app/agenting.py b/src/keria/app/agenting.py index 1fc8a210..127798da 100644 --- a/src/keria/app/agenting.py +++ b/src/keria/app/agenting.py @@ -4,22 +4,24 @@ keria.app.agenting module """ +import logging +import os from base64 import b64decode import json -import os import datetime -from dataclasses import asdict +from dataclasses import asdict, dataclass, field +from typing import List from urllib.parse import urlparse, urljoin from types import MappingProxyType import falcon +import lmdb from falcon import media -from hio.base import doing +from hio.base import doing, Doer from hio.core import http, tcp from hio.help import decking -from keri import kering -from keri import core +from keri import core, kering, help from keri.app.notifying import Notifier from keri.app.storing import Mailboxer @@ -43,6 +45,7 @@ from . import aiding, notifying, indirecting, credentialing, ipexing, delegating from . import grouping as keriagrouping +from .serving import GracefulShutdownDoer from ..peer import exchanging as keriaexchanging from .specing import AgentSpecResource from ..core import authing, longrunning, httping @@ -52,23 +55,135 @@ logger = ogler.getLogger() - -def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=None, configDir=None, - keypath=None, certpath=None, cafilepath=None, cors=False, releaseTimeout=None, curls=None, - iurls=None, durls=None, bootUsername=None, bootPassword=None): - """ Set up an ahab in Signify mode """ - - agency = Agency(name=name, base=base, bran=bran, configFile=configFile, configDir=configDir, releaseTimeout=releaseTimeout, curls=curls, iurls=iurls, durls=durls) +@dataclass +class KERIAServerConfig: + """ + Provides a dataclass to define server config so it is easy to test with multiprocess. + Dataclasses are Pickleable and can be passed to a new process. + """ + # HTTP ports to use. + # Admin port number the admin HTTP server listens on. + # Default is 3901. KERIA_ADMIN_PORT also sets this + adminPort: int = 3901 + # Local port number the HTTP server listens on. + # Default is 3902. KERIA_HTTP_PORT also sets this + httpPort: int | None = 3092 + # Boot port number the Boot HTTP server listens on. + # WARNING: This port needs to be secured. + # Default is 3903. KERIA_BOOT_PORT also sets this. + bootPort: int = 3903 + + # Agency master controller information and configuration + # Name of controller. Default is 'keria'. + name: str = "keria" + # additional optional prefix to file location of KERI keystore + base: str = "" + # 21 character encryption passcode for keystore (is not saved) + bran: str = None + # configuration filename + configFile: str = "keria" + # directory override for configuration data + configDir: str = None + + # TLS key material + # TLS server private key file + keyPath: str = None + # TLS server signed certificate (public key) file + certPath: str = None + # TLS server CA certificate chain + caFilePath: str = None + + # Logging configuration + # Set log level to DEBUG | INFO | WARNING | ERROR | CRITICAL. + # Default is CRITICAL + logLevel: str = "CRITICAL" + # path of the log file. If not defined, logs will not be written to the file. + logFile: str = None + + # Agency configuration + # Use CORS headers in the HTTP responses. Default is False + cors: bool = True + # Timeout for releasing agents. Default is 86400 seconds (24 hours) + releaseTimeout: int = 86400 + # Controller Service Endpoint Location OOBI URLs to resolve at startup of each Agent. Makes a 'controller' EndRole and LocScheme in the database for each URL + curls: List[str] = field(default_factory=list) + # General Introduction OOBI URLs to resolve at startup of each Agent. For things like witnesses, watchers, mailboxes, and TEL observers. + iurls: List[str] = field(default_factory=list) + # Data OOBI URLs resolved at startup of each Agent. For things like ACDC schemas, ACDCs (credentials), or other CESR streams. + durls: List[str] = field(default_factory=list) + + # Experimental configuration + # Experimental password for boot endpoint. Enables HTTP Basic Authentication for the boot endpoint. Only meant to be used for testing purposes. + bootPassword: str = None + # Experimental username for boot endpoint. Enables HTTP Basic Authentication for the boot endpoint. Only meant to be used for testing purposes. + bootUsername: str = None + +def runAgency(config: KERIAServerConfig): + """Runs a KERIA Agency with the given Doers by calling Doist.do(). Useful for testing.""" + help.ogler.level = logging.getLevelName(config.logLevel) + logger.setLevel(help.ogler.level) + if config.logFile is not None: + help.ogler.headDirPath = config.logFile + help.ogler.reopen(name=config.name, temp=False, clear=True) + + logger.info("Starting Agent for %s listening: admin/%s, http/%s, boot/%s", + config.name, config.adminPort, config.httpPort, config.bootPort) + logger.info("PID: %s", os.getpid()) + + doist = agencyDoist(setupDoers(config)) + doist.do() + +def agencyDoist(doers: List[Doer]): + """Creates a Doist for the Agency doers and adds a graceful shutdown handler. Useful for testing.""" + tock = 0.03125 + doist = doing.Doist(limit=0.0, tock=tock, real=True) + doers.append(GracefulShutdownDoer(doist=doist, agency=getAgency(doers))) + doist.doers = doers + return doist + +def getAgency(doers): + """Get the agency from a list of Doers. Used to get the Agency for the graceful agent shutdown.""" + for doer in doers: + if isinstance(doer, Agency): + return doer + return None + +def setupDoers(config: KERIAServerConfig): + """ + Sets up the HIO coroutines the KERIA agent server is composed of including three HTTP servers for a KERIA agent server: + 1. Boot server for bootstrapping agents. Signify calls this with a signed inception event. + 2. Admin server for administrative tasks like creating agents. + 3. HTTP server for all other agent operations. + """ + agency = Agency( + name=config.name, + base=config.base, + bran=config.bran, + configFile=config.configFile, + configDir=config.configDir, + releaseTimeout=config.releaseTimeout, + curls=config.curls, + iurls=config.iurls, + durls=config.durls + ) + allowed_cors_headers = [ + 'cesr-attachment', + 'cesr-date', + 'content-type', + 'signature', + 'signature-input', + 'signify-resource', + 'signify-timestamp' + ] bootApp = falcon.App(middleware=falcon.CORSMiddleware( allow_origins='*', allow_credentials='*', - expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input', - 'signify-resource', 'signify-timestamp'])) + expose_headers=allowed_cors_headers)) - bootServer = createHttpServer(bootPort, bootApp, keypath, certpath, cafilepath) + bootServer = createHttpServer(config.bootPort, bootApp, config.keyPath, config.certPath, config.caFilePath) if not bootServer.reopen(): - raise RuntimeError(f"cannot create boot http server on port {bootPort}") + raise RuntimeError(f"Cannot create boot HTTP server on port {config.bootPort}") bootServerDoer = http.ServerDoer(server=bootServer) - bootEnd = BootEnd(agency, username=bootUsername, password=bootPassword) + bootEnd = BootEnd(agency, username=config.bootUsername, password=config.bootPassword) bootApp.add_route("/boot", bootEnd) bootApp.add_route("/health", HealthEnd()) @@ -77,17 +192,16 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No app = falcon.App(middleware=falcon.CORSMiddleware( allow_origins='*', allow_credentials='*', - expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input', - 'signify-resource', 'signify-timestamp'])) - if cors: + expose_headers=allowed_cors_headers)) + if config.cors: app.add_middleware(middleware=httping.HandleCORS()) app.add_middleware(authing.SignatureValidationComponent(agency=agency, authn=authn, allowed=["/agent"])) app.req_options.media_handlers.update(media.Handlers()) app.resp_options.media_handlers.update(media.Handlers()) - adminServer = createHttpServer(adminPort, app, keypath, certpath, cafilepath) + adminServer = createHttpServer(config.adminPort, app, config.keyPath, config.certPath, config.caFilePath) if not adminServer.reopen(): - raise RuntimeError(f"cannot create admin http server on port {adminPort}") + raise RuntimeError(f"cannot create admin HTTP server on port {config.adminPort}") adminServerDoer = http.ServerDoer(server=adminServer) doers = [agency, bootServerDoer, adminServerDoer] @@ -100,20 +214,19 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No keriaexchanging.loadEnds(app=app) ipexing.loadEnds(app=app) - if httpPort: + if config.httpPort: happ = falcon.App(middleware=falcon.CORSMiddleware( allow_origins='*', allow_credentials='*', - expose_headers=['cesr-attachment', 'cesr-date', 'content-type', 'signature', 'signature-input', - 'signify-resource', 'signify-timestamp'])) + expose_headers=allowed_cors_headers)) happ.req_options.media_handlers.update(media.Handlers()) happ.resp_options.media_handlers.update(media.Handlers()) ending.loadEnds(agency=agency, app=happ) indirecting.loadEnds(agency=agency, app=happ) - server = createHttpServer(httpPort, happ, keypath, certpath, cafilepath) + server = createHttpServer(config.httpPort, happ, config.keyPath, config.certPath, config.caFilePath) if not server.reopen(): - raise RuntimeError(f"cannot create local http server on port {httpPort}") + raise RuntimeError(f"cannot create local http server on port {config.httpPort}") httpServerDoer = http.ServerDoer(server=server) doers.append(httpServerDoer) @@ -124,7 +237,7 @@ def setup(name, bran, adminPort, bootPort, base='', httpPort=None, configFile=No specEnd.addRoutes(happ) happ.add_route("/spec.yaml", specEnd) - print("The Agency is loaded and waiting for requests...") + logger.info("The Agency is loaded and waiting for requests...") return doers @@ -249,19 +362,22 @@ def delete(self, agent): del self.agents[agent.caid] def shut(self, agent): - logger.info(f"closing idle agent {agent.caid}") + logger.info(f"Shutting down agent {agent.caid}") agent.remove(agent.doers) self.remove([agent]) del self.agents[agent.caid] - agent.hby.ks.close(clear=False) - agent.seeker.close(clear=False) - agent.exnseeker.close(clear=False) - agent.monitor.opr.close(clear=False) - agent.notifier.noter.close(clear=False) - agent.rep.mbx.close(clear=False) - agent.registrar.rgy.close() - agent.mgr.rb.close(clear=False) - agent.hby.close(clear=False) + try: + agent.hby.ks.close(clear=False) + agent.seeker.close(clear=False) + agent.exnseeker.close(clear=False) + agent.monitor.opr.close(clear=False) + agent.notifier.noter.close(clear=False) + agent.rep.mbx.close(clear=False) + agent.registrar.rgy.close() + agent.mgr.rb.close(clear=False) + agent.hby.close(clear=False) + except lmdb.Error as ex: # Sometimes LMDB will throw an error if the DB is already closed + logger.error(f"Error closing databases for agent {agent.caid}: {ex}") def get(self, caid): if caid in self.agents: diff --git a/src/keria/app/cli/commands/sig-fix.py b/src/keria/app/cli/commands/sig-fix.py index a2082115..84069968 100644 --- a/src/keria/app/cli/commands/sig-fix.py +++ b/src/keria/app/cli/commands/sig-fix.py @@ -6,9 +6,8 @@ """ import argparse -from hio import help from hio.base import doing -from keri import kering +from keri import help, kering from keri.app import habbing from keri.app.cli.common import existing from keri.core import serdering, coring diff --git a/src/keria/app/cli/commands/start.py b/src/keria/app/cli/commands/start.py index c7f21661..99336d53 100644 --- a/src/keria/app/cli/commands/start.py +++ b/src/keria/app/cli/commands/start.py @@ -1,17 +1,15 @@ # -*- encoding: utf-8 -*- """ KERIA -keria.cli.keria.commands module +keria.cli.keria.commands.start module -Witness command line interface +KERIA Agent server start command line interface (CLI) command """ import argparse -import logging import os from keri import __version__ from keri import help -from keri.app import directing from keria.app import agenting @@ -77,42 +75,34 @@ dest="bootUsername", default=os.getenv("KERIA_EXPERIMENTAL_BOOT_USERNAME")) -def getListVariable(name): - value = os.getenv(name) - return value.split(";") if value else None - -def launch(args): - help.ogler.level = logging.getLevelName(args.loglevel) - if(args.logfile != None): - help.ogler.headDirPath = args.logfile - help.ogler.reopen(name=args.name, temp=False, clear=True) - - logger = help.ogler.getLogger() - logger.info("******* Starting Agent for %s listening: admin/%s, http/%s " - ".******", args.name, args.admin, args.http) - - agency = agenting.setup(name=args.name or "ahab", - base=args.base or "", - bran=args.bran, - adminPort=args.admin, - httpPort=args.http, - bootPort=args.boot, - configFile=args.configFile, - configDir=args.configDir, - keypath=args.keypath, - certpath=args.certpath, - cafilepath=args.cafilepath, - cors=os.getenv("KERI_AGENT_CORS", "false").lower() in ("true", "1"), - releaseTimeout=int(os.getenv("KERIA_RELEASER_TIMEOUT", "86400")), - curls=getListVariable("KERIA_CURLS"), - iurls=getListVariable("KERIA_IURLS"), - durls=getListVariable("KERIA_DURLS"), - bootPassword=args.bootPassword, - bootUsername=args.bootUsername) - - directing.runController(doers=agency, expire=0.0) +logger = help.ogler.getLogger() +def launch(args): + agenting.runAgency(agenting.KERIAServerConfig( + name=args.name or "ahab", + base=args.base or "", + bran=args.bran, + adminPort=args.admin, + httpPort=args.http, + bootPort=args.boot, + configFile=args.configFile, + configDir=args.configDir, + keyPath=args.keypath, + certPath=args.certpath, + caFilePath=args.cafilepath, + logLevel=args.loglevel, + logFile=args.logfile, + cors=os.getenv("KERI_AGENT_CORS", "false").lower() in ("true", "1"), + releaseTimeout=int(os.getenv("KERIA_RELEASER_TIMEOUT", "86400")), + curls=getListVariable("KERIA_CURLS"), + iurls=getListVariable("KERIA_IURLS"), + durls=getListVariable("KERIA_DURLS"), + bootPassword=args.bootPassword, + bootUsername=args.bootUsername + )) + logger.info("Agent %s gracefully stopped", args.name) - logger.info("******* Ended Agent for %s listening: admin/%s, http/%s" - ".******", args.name, args.admin, args.http) +def getListVariable(name): + value = os.getenv(name) + return value.split(";") if value else None \ No newline at end of file diff --git a/src/keria/app/cli/keria.py b/src/keria/app/cli/keria.py index a48e7e48..fd136e86 100644 --- a/src/keria/app/cli/keria.py +++ b/src/keria/app/cli/keria.py @@ -5,14 +5,9 @@ """ import multicommand -from keri import help -from keri.app import directing from keria.app.cli import commands -logger = help.ogler.getLogger() - - def main(): parser = multicommand.create_parser(commands) args = parser.parse_args() @@ -22,8 +17,7 @@ def main(): return try: - doers = args.handler(args) - directing.runController(doers=doers, expire=0.0) + args.handler(args) except Exception as ex: # print(f"ERR: {ex}") diff --git a/src/keria/app/serving.py b/src/keria/app/serving.py new file mode 100644 index 00000000..0b18f822 --- /dev/null +++ b/src/keria/app/serving.py @@ -0,0 +1,65 @@ +import signal + +from hio.base import doing, Doist +from keri import help + +logger = help.ogler.getLogger() + +class GracefulShutdownDoer(doing.Doer): + """ + Shuts all Agency agents down before exiting the Doist loop, performing a graceful shutdown. + Sets up signal handler in the Doer.enter lifecycle method and exits the Doist scheduler loop in Doer.exit + Checks for the shutdown flag being set in the Doer.recur lifecycle method. + """ + def __init__(self, doist, agency, **kwa): + """ + Parameters: + doist (Doist): The Doist running this Doer + agency (Agency): The Agency containing Agent instances to be gracefully shut down + kwa (dict): Additional keyword arguments for Doer initialization + """ + self.doist: Doist = doist + self.agency = agency + self.shutdown_received = False + + super().__init__(**kwa) + + def handle_sigterm(self, signum, frame): + """Handler function for SIGTERM""" + logger.info(f"Received SIGTERM, initiating graceful shutdown.") + self.shutdown_received = True + + def shutdown_agents(self, agents): + """Helper function to shut down the agents.""" + logger.info("Stopping %s agents", len(agents)) + for caid in agents: + self.agency.shut(self.agency.agents[caid]) + + def enter(self): + """ + Sets up signal handlers. + Lifecycle method called once when the Doist running this Doer enters the context for this Doer. + """ + # Register signal handler + signal.signal(signal.SIGTERM, self.handle_sigterm) + logger.info("Registered signal handlers for SIGTERM") + + def recur(self, tock=0.0): + """Generator coroutine checking once per tock for shutdown flag""" + # Checks once per tock if the shutdown flag has been set and if so initiates the shutdown process + while not self.shutdown_received: + yield tock # will iterate forever in here until shutdown flag set + + # Once shutdown_flag is set, exit the Doist loop + self.shutdown_agents(list(self.agency.agents.keys())) + + return True # Returns a "done" status + # Causes the Doist scheduler to call .exit() lifecycle method below, killing the doist loop + + def exit(self): + """ + Exits the Doist loop. + Lifecycle method called once when the Doist running this Doer exits the context for this Doer. + """ + logger.info(f"Shutting down main Doist loop") + self.doist.exit() diff --git a/tests/app/test_agenting.py b/tests/app/test_agenting.py index e0338f38..3a84a18f 100644 --- a/tests/app/test_agenting.py +++ b/tests/app/test_agenting.py @@ -3,30 +3,32 @@ KERIA keria.app.agenting module -Testing the Mark II Agent +Testing the Mark II Agent (KERIA) """ -from base64 import b64encode import json +import multiprocessing import os import shutil - -import pytest +import signal +import time +from base64 import b64encode import falcon import hio +import pytest +import requests from falcon import testing from hio.base import doing, tyming from hio.core import http, tcp from hio.help import decking +from keri import core from keri import kering from keri.app import habbing, configing, indirecting, oobiing, querying from keri.app.agenting import Receiptor, WitnessReceiptor -from keri import core -from keri.core import coring, serdering +from keri.core import serdering from keri.core.coring import MtrDex from keri.db import basing, dbing from keri.help import nowIso8601 -from keri.vc import proving from keri.vdr import credentialing from keria.app import agenting, aiding @@ -35,14 +37,103 @@ def test_setup_no_http(): - doers = agenting.setup(name="test", bran=None, adminPort=1234, bootPort=5678) + doers = agenting.setupDoers(agenting.KERIAServerConfig( + name="test", + adminPort=1234, + bootPort=5678, + httpPort=None, + )) assert len(doers) == 3 assert isinstance(doers[0], agenting.Agency) is True def test_setup(): - doers = agenting.setup("test", bran=None, adminPort=1234, bootPort=5678, httpPort=9999) + doers = agenting.setupDoers(agenting.KERIAServerConfig( + name="test", + adminPort=1234, + bootPort=5678, + httpPort=9999, + )) assert len(doers) == 4 +def wait_for_server(port, timeout=10): + """Poll server until it responds or until timeout""" + url=f"http://127.0.0.1:{port}/health" + start_time=time.time() + while time.time() - start_time < timeout: + try: + response = requests.get(url) + if response.status_code == 200: + return True # Server is up + except requests.ConnectionError: + pass # Server not ready yet + time.sleep(0.25) # Retry every 1/4 second + return False # Timeout + +def test_shutdown_signals(): + config = agenting.KERIAServerConfig( + adminPort=3333, + bootPort=4444, + httpPort=5555, + ) + + # Test SIGTERM + agency_process = multiprocessing.Process(target=agenting.runAgency, args=[config]) + agency_process.start() + assert wait_for_server(config.bootPort), "Agency did not start as expected." + + os.kill(agency_process.pid, signal.SIGTERM) # Send SigTerm to the process, signal 15 + agency_process.join(timeout=10) + assert not agency_process.is_alive(), "SIGTERM: Agency process did not shut down as expected." + assert agency_process.exitcode == 0, f"SIGTERM: Agency exited with non-zero exit code {agency_process.exitcode}" + + # Test SIGINT + agency_process = multiprocessing.Process(target=agenting.runAgency, args=[config]) + agency_process.start() + assert wait_for_server(config.bootPort), "Agency did not start as expected." + + os.kill(agency_process.pid, signal.SIGINT) # Sends SigInt to the process, signal 2 + agency_process.join(timeout=10) + assert not agency_process.is_alive(), "SIGINT: Agency process did not shut down as expected." + assert agency_process.exitcode == 0, f"SIGINT: Agency exited with non-zero exit code {agency_process.exitcode}" + +def test_graceful_shutdown_doer(): + salt = b'0123456789abcdef' + salter = core.Salter(raw=salt) + cf = configing.Configer(name="keria", headDirPath=SCRIPTS_DIR, temp=True, reopen=True, clear=False) + with habbing.openHby(name="keria", salt=salter.qb64, temp=True, cf=cf) as hby: + hab = hby.makeHab(name="test") + + agency = agenting.Agency(name="agency", base="", bran=None, temp=True, configFile="keria", + releaseTimeout=0, configDir=SCRIPTS_DIR) + + tock = 0.03125 + limit = 1.0 + doist = doing.Doist(limit=limit, tock=tock, real=True) + shutdownDoer = agenting.GracefulShutdownDoer(doist=doist, agency=agency) + doers = [agency, shutdownDoer] + doist.enter(doers=doers) + + caid = "ELI7pg979AdhmvrjDeam2eAO2SR5niCgnjAJXJHtJose" + agent = agency.create(caid, salt=salter.qb64) + assert agent.pre == "EIAEKYpTygdBtFHBrHKWeh0aYCdx0ZJqZtzQLFnaDB2b" + assert len(agency.agents) == 1, "Agent not created as expected." + + assert shutdownDoer.shutdown_received is False + shutdownDoer.enter() + sigterm_handler = signal.getsignal(signal.SIGTERM) + assert sigterm_handler == shutdownDoer.handle_sigterm, "SIGTERM handler not set as expected." + + # Call SIGTERM handler manually since can't send sigterm to self in test. + # See test_shutdown_signals test above for an example of sending signals. + shutdownDoer.handle_sigterm(signal.SIGTERM, None) + + doist.do(doers=doers) + assert shutdownDoer.shutdown_received is True + + # shutdownDoer.shutdown_agents(agency.agents) + assert len(agency.agents) == 0, "Agents not shut down as expected." + + def test_load_ends(helpers): with helpers.openKeria() as (agency, agent, app, client):