Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deregister services from service registry on shutdown #5396

Merged
merged 14 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ in development
Added
~~~~~

* Added service degerestration on shutdown of a service. #5396

Contributed by @khushboobhatia01

* Added possibility to add new values to the KV store via CLI without leaking them to the shell history. #5164

* ``st2.conf`` is now the only place to configure ports for ``st2api``, ``st2auth``, and ``st2stream``.
Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/cmd/actionrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service

__all__ = ["main"]

LOG = logging.getLogger(__name__)
ACTIONRUNNER = "actionrunner"


def _setup_sigterm_handler():
Expand All @@ -49,7 +51,7 @@ def sigterm_handler(signum=None, frame=None):
def _setup():
capabilities = {"name": "actionrunner", "type": "passive"}
common_setup(
service="actionrunner",
service=ACTIONRUNNER,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -75,6 +77,7 @@ def _run_worker():
errors = False

try:
deregister_service(service=ACTIONRUNNER)
action_worker.shutdown()
except:
LOG.exception("Unable to shutdown worker.")
Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/cmd/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
from st2common import log as logging
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import setup as common_setup
from st2common.service_setup import deregister_service

__all__ = ["main"]

LOG = logging.getLogger(__name__)
SCHEDULER = "scheduler"


def _setup_sigterm_handler():
Expand All @@ -47,7 +49,7 @@ def sigterm_handler(signum=None, frame=None):
def _setup():
capabilities = {"name": "scheduler", "type": "passive"}
common_setup(
service="scheduler",
service=SCHEDULER,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand Down Expand Up @@ -101,6 +103,7 @@ def _run_scheduler():
errors = False

try:
deregister_service(service=SCHEDULER)
handler.shutdown()
entrypoint.shutdown()
except:
Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/cmd/st2notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2actions.notifier import config
from st2actions.notifier import notifier

__all__ = ["main"]

LOG = logging.getLogger(__name__)
NOTIFIER = "notifier"


def _setup():
capabilities = {"name": "notifier", "type": "passive"}
common_setup(
service="notifier",
service=NOTIFIER,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -53,6 +55,7 @@ def _run_worker():
actions_notifier.start(wait=True)
except (KeyboardInterrupt, SystemExit):
LOG.info("(PID=%s) Actions notifier stopped.", os.getpid())
deregister_service(service=NOTIFIER)
actions_notifier.shutdown()
return 0

Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/cmd/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service

__all__ = ["main"]

LOG = logging.getLogger(__name__)
WORKFLOW_ENGINE = "workflow_engine"


def setup_sigterm_handler():
Expand All @@ -51,7 +53,7 @@ def sigterm_handler(signum=None, frame=None):
def setup():
capabilities = {"name": "workflowengine", "type": "passive"}
common_setup(
service="workflow_engine",
service=WORKFLOW_ENGINE,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -72,6 +74,7 @@ def run_server():
engine.start(wait=True)
except (KeyboardInterrupt, SystemExit):
LOG.info("(PID=%s) Workflow engine stopped.", os.getpid())
deregister_service(service=WORKFLOW_ENGINE)
engine.shutdown()
except:
LOG.exception("(PID=%s) Workflow engine unexpectedly stopped.", os.getpid())
Expand Down
5 changes: 4 additions & 1 deletion st2api/st2api/cmd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2api import config

config.register_opts(ignore_errors=True)
Expand All @@ -41,6 +42,7 @@
__all__ = ["main"]

LOG = logging.getLogger(__name__)
API = "api"

# How much time to give to the request in progress to finish in seconds before killing them
WSGI_SERVER_REQUEST_SHUTDOWN_TIME = 2
Expand All @@ -55,7 +57,7 @@ def _setup():
}

common_setup(
service="api",
service=API,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand Down Expand Up @@ -94,6 +96,7 @@ def main():
_setup()
return _run_server()
except SystemExit as exit_code:
deregister_service(API)
sys.exit(exit_code)
except Exception:
LOG.exception("(PID=%s) ST2 API quit due to exception.", os.getpid())
Expand Down
5 changes: 4 additions & 1 deletion st2auth/st2auth/cmd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from st2common import log as logging
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2auth import config

config.register_opts(ignore_errors=True)
Expand All @@ -39,6 +40,7 @@


LOG = logging.getLogger(__name__)
AUTH = "auth"


def _setup():
Expand All @@ -50,7 +52,7 @@ def _setup():
"type": "active",
}
common_setup(
service="auth",
service=AUTH,
config=config,
setup_db=True,
register_mq_exchanges=False,
Expand Down Expand Up @@ -108,6 +110,7 @@ def main():
_setup()
return _run_server()
except SystemExit as exit_code:
deregister_service(AUTH)
sys.exit(exit_code)
except Exception:
LOG.exception("(PID=%s) ST2 Auth API quit due to exception.", os.getpid())
Expand Down
22 changes: 22 additions & 0 deletions st2common/st2common/service_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import eventlet.debug
from oslo_config import cfg
from tooz.coordination import GroupAlreadyExist
from tooz.coordination import GroupNotCreated
from tooz.coordination import MemberNotJoined

from st2common import log as logging
from st2common.constants.logging import DEFAULT_LOGGING_CONF_PATH
Expand Down Expand Up @@ -62,6 +64,7 @@
"db_setup",
"db_teardown",
"register_service_in_service_registry",
"deregister_service",
]

# Message which is logged if non utf-8 locale is detected on startup.
Expand Down Expand Up @@ -339,3 +342,22 @@ def register_service_in_service_registry(service, capabilities=None, start_heart
% (group_id, member_id, capabilities)
)
return coordinator.join_group(group_id, capabilities=capabilities).get()


def deregister_service(service, start_heart=True):

if not isinstance(service, six.binary_type):
group_id = service.encode("utf-8")
else:
group_id = service

coordinator = coordination.get_coordinator(start_heart=start_heart)

member_id = coordination.get_member_id()
LOG.debug(
'Leaving service registry group "%s" as member_id "%s"' % (group_id, member_id)
)
try:
coordinator.leave_group(group_id).get()
except (GroupNotCreated, MemberNotJoined):
pass
10 changes: 9 additions & 1 deletion st2common/st2common/services/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from tooz import coordination
from tooz import locking
from tooz.coordination import GroupNotCreated
from tooz.coordination import MemberNotJoined

from st2common import log as logging
from st2common.util import system_info
Expand Down Expand Up @@ -124,8 +125,15 @@ def join_group(cls, group_id, capabilities=""):
@classmethod
def leave_group(cls, group_id):
member_id = get_member_id()
try:
members = cls.groups[group_id]["members"]
except KeyError:
raise GroupNotCreated(group_id)

del cls.groups[group_id]["members"][member_id]
try:
del members[member_id]
except KeyError:
raise MemberNotJoined(group_id, member_id)
return NoOpAsyncResult()

@classmethod
Expand Down
21 changes: 21 additions & 0 deletions st2common/tests/unit/test_service_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from oslo_config.cfg import ConfigFilesNotFoundError

from st2common import service_setup
from st2common.services import coordination
from st2common.transport.bootstrap_utils import register_exchanges
from st2common.transport.bootstrap_utils import QUEUES

Expand Down Expand Up @@ -216,3 +217,23 @@ def mock_get_logging_config_path():
run_migrations=False,
register_runners=False,
)

def test_deregister_service_when_service_registry_enabled(self):
service = "api"
service_setup.register_service_in_service_registry(
service, capabilities={"hostname": "", "pid": ""}
)
coordinator = coordination.get_coordinator(start_heart=True)
members = coordinator.get_members(service.encode("utf-8"))
self.assertEqual(len(list(members.get())), 1)
service_setup.deregister_service(service)
self.assertEqual(len(list(members.get())), 0)

def test_deregister_service_when_service_registry_disables(self):
service = "api"
try:
service_setup.deregister_service(service)
except:
assert False, "service_setup.deregister_service raised exception"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khushboobhatia01 What happens if this error is raised on shutdown? I think it is ok to make this silent if there is nothing to be deregister.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m4dcoder We don't expect deregister() to raise an exception when service_registry is disabled or there's nothing to deregister. This test case would fail only when some unintended change is causing deregister to fail. All edge cases are handled in deregister(). Therefore it might be good to know if some new change is causing it to fail.


assert True
5 changes: 4 additions & 1 deletion st2reactor/st2reactor/cmd/garbagecollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from st2common.logging.misc import get_logger_name_for_module
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2common.constants.exit_codes import FAILURE_EXIT_CODE
from st2reactor.garbage_collector import config
from st2reactor.garbage_collector.base import GarbageCollectorService
Expand All @@ -37,12 +38,13 @@

LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__])
LOG = logging.getLogger(LOGGER_NAME)
GARBAGE_COLLECTOR = "garbagecollector"


def _setup():
capabilities = {"name": "garbagecollector", "type": "passive"}
common_setup(
service="garbagecollector",
service=GARBAGE_COLLECTOR,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -68,6 +70,7 @@ def main():
)
exit_code = garbage_collector.run()
except SystemExit as exit_code:
deregister_service(GARBAGE_COLLECTOR)
return exit_code
except:
LOG.exception("(PID:%s) GarbageCollector quit due to exception.", os.getpid())
Expand Down
5 changes: 4 additions & 1 deletion st2reactor/st2reactor/cmd/rulesengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@
from st2common.logging.misc import get_logger_name_for_module
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2reactor.rules import config
from st2reactor.rules import worker


LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__])
LOG = logging.getLogger(LOGGER_NAME)
RULESENGINE = "rulesengine"


def _setup():
capabilities = {"name": "rulesengine", "type": "passive"}
common_setup(
service="rulesengine",
service=RULESENGINE,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand All @@ -63,6 +65,7 @@ def _run_worker():
return rules_engine_worker.wait()
except (KeyboardInterrupt, SystemExit):
LOG.info("(PID=%s) RulesEngine stopped.", os.getpid())
deregister_service(RULESENGINE)
rules_engine_worker.shutdown()
except:
LOG.exception("(PID:%s) RulesEngine quit due to exception.", os.getpid())
Expand Down
5 changes: 4 additions & 1 deletion st2reactor/st2reactor/cmd/sensormanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from st2common.logging.misc import get_logger_name_for_module
from st2common.service_setup import setup as common_setup
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2common.exceptions.sensors import SensorNotFoundException
from st2common.constants.exit_codes import FAILURE_EXIT_CODE
from st2reactor.sensor import config
Expand All @@ -39,12 +40,13 @@

LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__])
LOG = logging.getLogger(LOGGER_NAME)
SENSOR_CONTAINER = "sensorcontainer"


def _setup():
capabilities = {"name": "sensorcontainer", "type": "passive"}
common_setup(
service="sensorcontainer",
service=SENSOR_CONTAINER,
config=config,
setup_db=True,
register_mq_exchanges=True,
Expand Down Expand Up @@ -80,6 +82,7 @@ def main():
)
return container_manager.run_sensors()
except SystemExit as exit_code:
deregister_service(SENSOR_CONTAINER)
return exit_code
except SensorNotFoundException as e:
LOG.exception(e)
Expand Down
Loading