Skip to content

Commit

Permalink
Merge pull request #5396 from khushboobhatia01/deregister_on_shutdown
Browse files Browse the repository at this point in the history
Deregister services from service registry on shutdown
  • Loading branch information
arm4b authored Dec 2, 2021
2 parents 6bcecbb + 01c2b3e commit caacdfa
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ in development
Added
~~~~~

* Added service degerestration on shutdown of a service. #5396

Contributed by @khushboobhatia01

* Added possibility to add new values to the KV store via CLI without leaking them to the shell history. #5164

* ``st2.conf`` is now the only place to configure ports for ``st2api``, ``st2auth``, and ``st2stream``.
Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/cmd/actionrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service

__all__ = ["main"]

LOG = logging.getLogger(__name__)
ACTIONRUNNER = "actionrunner"


def _setup_sigterm_handler():
Expand All @@ -49,7 +51,7 @@ def sigterm_handler(signum=None, frame=None):
def _setup():
capabilities = {"name": "actionrunner", "type": "passive"}
common_setup(
service="actionrunner",
service=ACTIONRUNNER,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -75,6 +77,7 @@ def _run_worker():
errors = False

try:
deregister_service(service=ACTIONRUNNER)
action_worker.shutdown()
except:
LOG.exception("Unable to shutdown worker.")
Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/cmd/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
from st2common import log as logging
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import setup as common_setup
from st2common.service_setup import deregister_service

__all__ = ["main"]

LOG = logging.getLogger(__name__)
SCHEDULER = "scheduler"


def _setup_sigterm_handler():
Expand All @@ -47,7 +49,7 @@ def sigterm_handler(signum=None, frame=None):
def _setup():
capabilities = {"name": "scheduler", "type": "passive"}
common_setup(
service="scheduler",
service=SCHEDULER,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand Down Expand Up @@ -101,6 +103,7 @@ def _run_scheduler():
errors = False

try:
deregister_service(service=SCHEDULER)
handler.shutdown()
entrypoint.shutdown()
except:
Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/cmd/st2notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2actions.notifier import config
from st2actions.notifier import notifier

__all__ = ["main"]

LOG = logging.getLogger(__name__)
NOTIFIER = "notifier"


def _setup():
capabilities = {"name": "notifier", "type": "passive"}
common_setup(
service="notifier",
service=NOTIFIER,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -53,6 +55,7 @@ def _run_worker():
actions_notifier.start(wait=True)
except (KeyboardInterrupt, SystemExit):
LOG.info("(PID=%s) Actions notifier stopped.", os.getpid())
deregister_service(service=NOTIFIER)
actions_notifier.shutdown()
return 0

Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/cmd/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service

__all__ = ["main"]

LOG = logging.getLogger(__name__)
WORKFLOW_ENGINE = "workflow_engine"


def setup_sigterm_handler():
Expand All @@ -51,7 +53,7 @@ def sigterm_handler(signum=None, frame=None):
def setup():
capabilities = {"name": "workflowengine", "type": "passive"}
common_setup(
service="workflow_engine",
service=WORKFLOW_ENGINE,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -72,6 +74,7 @@ def run_server():
engine.start(wait=True)
except (KeyboardInterrupt, SystemExit):
LOG.info("(PID=%s) Workflow engine stopped.", os.getpid())
deregister_service(service=WORKFLOW_ENGINE)
engine.shutdown()
except:
LOG.exception("(PID=%s) Workflow engine unexpectedly stopped.", os.getpid())
Expand Down
5 changes: 4 additions & 1 deletion st2api/st2api/cmd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2api import config

config.register_opts(ignore_errors=True)
Expand All @@ -41,6 +42,7 @@
__all__ = ["main"]

LOG = logging.getLogger(__name__)
API = "api"

# How much time to give to the request in progress to finish in seconds before killing them
WSGI_SERVER_REQUEST_SHUTDOWN_TIME = 2
Expand All @@ -55,7 +57,7 @@ def _setup():
}

common_setup(
service="api",
service=API,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand Down Expand Up @@ -94,6 +96,7 @@ def main():
_setup()
return _run_server()
except SystemExit as exit_code:
deregister_service(API)
sys.exit(exit_code)
except Exception:
LOG.exception("(PID=%s) ST2 API quit due to exception.", os.getpid())
Expand Down
5 changes: 4 additions & 1 deletion st2auth/st2auth/cmd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2auth import config

config.register_opts(ignore_errors=True)
Expand All @@ -39,6 +40,7 @@


LOG = logging.getLogger(__name__)
AUTH = "auth"


def _setup():
Expand All @@ -50,7 +52,7 @@ def _setup():
"type": "active",
}
common_setup(
service="auth",
service=AUTH,
config=config,
setup_db=True,
register_mq_exchanges=False,
Expand Down Expand Up @@ -108,6 +110,7 @@ def main():
_setup()
return _run_server()
except SystemExit as exit_code:
deregister_service(AUTH)
sys.exit(exit_code)
except Exception:
LOG.exception("(PID=%s) ST2 Auth API quit due to exception.", os.getpid())
Expand Down
22 changes: 22 additions & 0 deletions st2common/st2common/service_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import eventlet.debug
from oslo_config import cfg
from tooz.coordination import GroupAlreadyExist
from tooz.coordination import GroupNotCreated
from tooz.coordination import MemberNotJoined

from st2common import log as logging
from st2common.constants.logging import DEFAULT_LOGGING_CONF_PATH
Expand Down Expand Up @@ -62,6 +64,7 @@
"db_setup",
"db_teardown",
"register_service_in_service_registry",
"deregister_service",
]

# Message which is logged if non utf-8 locale is detected on startup.
Expand Down Expand Up @@ -339,3 +342,22 @@ def register_service_in_service_registry(service, capabilities=None, start_heart
% (group_id, member_id, capabilities)
)
return coordinator.join_group(group_id, capabilities=capabilities).get()


def deregister_service(service, start_heart=True):

if not isinstance(service, six.binary_type):
group_id = service.encode("utf-8")
else:
group_id = service

coordinator = coordination.get_coordinator(start_heart=start_heart)

member_id = coordination.get_member_id()
LOG.debug(
'Leaving service registry group "%s" as member_id "%s"' % (group_id, member_id)
)
try:
coordinator.leave_group(group_id).get()
except (GroupNotCreated, MemberNotJoined):
pass
10 changes: 9 additions & 1 deletion st2common/st2common/services/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from tooz import coordination
from tooz import locking
from tooz.coordination import GroupNotCreated
from tooz.coordination import MemberNotJoined

from st2common import log as logging
from st2common.util import system_info
Expand Down Expand Up @@ -124,8 +125,15 @@ def join_group(cls, group_id, capabilities=""):
@classmethod
def leave_group(cls, group_id):
member_id = get_member_id()
try:
members = cls.groups[group_id]["members"]
except KeyError:
raise GroupNotCreated(group_id)

del cls.groups[group_id]["members"][member_id]
try:
del members[member_id]
except KeyError:
raise MemberNotJoined(group_id, member_id)
return NoOpAsyncResult()

@classmethod
Expand Down
21 changes: 21 additions & 0 deletions st2common/tests/unit/test_service_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from oslo_config.cfg import ConfigFilesNotFoundError

from st2common import service_setup
from st2common.services import coordination
from st2common.transport.bootstrap_utils import register_exchanges
from st2common.transport.bootstrap_utils import QUEUES

Expand Down Expand Up @@ -216,3 +217,23 @@ def mock_get_logging_config_path():
run_migrations=False,
register_runners=False,
)

def test_deregister_service_when_service_registry_enabled(self):
service = "api"
service_setup.register_service_in_service_registry(
service, capabilities={"hostname": "", "pid": ""}
)
coordinator = coordination.get_coordinator(start_heart=True)
members = coordinator.get_members(service.encode("utf-8"))
self.assertEqual(len(list(members.get())), 1)
service_setup.deregister_service(service)
self.assertEqual(len(list(members.get())), 0)

def test_deregister_service_when_service_registry_disables(self):
service = "api"
try:
service_setup.deregister_service(service)
except:
assert False, "service_setup.deregister_service raised exception"

assert True
5 changes: 4 additions & 1 deletion st2reactor/st2reactor/cmd/garbagecollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from st2common.logging.misc import get_logger_name_for_module
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2common.constants.exit_codes import FAILURE_EXIT_CODE
from st2reactor.garbage_collector import config
from st2reactor.garbage_collector.base import GarbageCollectorService
Expand All @@ -37,12 +38,13 @@

LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__])
LOG = logging.getLogger(LOGGER_NAME)
GARBAGE_COLLECTOR = "garbagecollector"


def _setup():
capabilities = {"name": "garbagecollector", "type": "passive"}
common_setup(
service="garbagecollector",
service=GARBAGE_COLLECTOR,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -68,6 +70,7 @@ def main():
)
exit_code = garbage_collector.run()
except SystemExit as exit_code:
deregister_service(GARBAGE_COLLECTOR)
return exit_code
except:
LOG.exception("(PID:%s) GarbageCollector quit due to exception.", os.getpid())
Expand Down
5 changes: 4 additions & 1 deletion st2reactor/st2reactor/cmd/rulesengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@
from st2common.logging.misc import get_logger_name_for_module
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2reactor.rules import config
from st2reactor.rules import worker


LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__])
LOG = logging.getLogger(LOGGER_NAME)
RULESENGINE = "rulesengine"


def _setup():
capabilities = {"name": "rulesengine", "type": "passive"}
common_setup(
service="rulesengine",
service=RULESENGINE,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -63,6 +65,7 @@ def _run_worker():
return rules_engine_worker.wait()
except (KeyboardInterrupt, SystemExit):
LOG.info("(PID=%s) RulesEngine stopped.", os.getpid())
deregister_service(RULESENGINE)
rules_engine_worker.shutdown()
except:
LOG.exception("(PID:%s) RulesEngine quit due to exception.", os.getpid())
Expand Down
5 changes: 4 additions & 1 deletion st2reactor/st2reactor/cmd/sensormanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from st2common.logging.misc import get_logger_name_for_module
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2common.exceptions.sensors import SensorNotFoundException
from st2common.constants.exit_codes import FAILURE_EXIT_CODE
from st2reactor.sensor import config
Expand All @@ -39,12 +40,13 @@

LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__])
LOG = logging.getLogger(LOGGER_NAME)
SENSOR_CONTAINER = "sensorcontainer"


def _setup():
capabilities = {"name": "sensorcontainer", "type": "passive"}
common_setup(
service="sensorcontainer",
service=SENSOR_CONTAINER,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand Down Expand Up @@ -80,6 +82,7 @@ def main():
)
return container_manager.run_sensors()
except SystemExit as exit_code:
deregister_service(SENSOR_CONTAINER)
return exit_code
except SensorNotFoundException as e:
LOG.exception(e)
Expand Down
Loading

0 comments on commit caacdfa

Please sign in to comment.