Skip to content

Commit

Permalink
messaging: Make exchange/queue name prefix configurable
Browse files Browse the repository at this point in the history
This, adds a new messaging.prefix option that gets modified
to include the test slot as needed.

Exchanges are defined as module level vars initialized on import.
But, oslo_config is not setup yet at import time, so delay applying the
new prefix setting until just before they get created in RabbitMQ.

Luckily, Exchange() objects are lightweight objects that just
hold the exchange name and type. They could do more, but we
don't bind them to a channel. Because the exchange objects
merely hold strings, and because they are basically singletons
(module-level vars), we can safely update the exchange name
just before declaring it in RabbitMQ. From that point on,
everything that uses a singleton exchange object will get
the updated name.
  • Loading branch information
cognifloyd committed Nov 9, 2024
1 parent 3c99f88 commit af407ca
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
5 changes: 5 additions & 0 deletions st2common/st2common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ def register_opts(ignore_errors=False):
help="Compression algorithm to use for compressing the payloads which are sent over "
"the message bus. Defaults to no compression.",
),
cfg.StrOpt(
"prefix",
default = "st2",
help = "Prefix for all exchange and queue names.",
),
]

do_register_opts(messaging_opts, "messaging", ignore_errors)
Expand Down
9 changes: 8 additions & 1 deletion st2common/st2common/transport/bootstrap_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG
from st2common.transport.announcement import ANNOUNCEMENT_XCHG
from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper
from st2common.transport.execution import EXECUTION_XCHG
from st2common.transport.execution import EXECUTION_XCHG, EXECUTION_OUTPUT_XCHG
from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG
from st2common.transport.reactor import SENSOR_CUD_XCHG
from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG
Expand Down Expand Up @@ -67,6 +67,7 @@
ACTIONEXECUTIONSTATE_XCHG,
ANNOUNCEMENT_XCHG,
EXECUTION_XCHG,
EXECUTION_OUTPUT_XCHG,
LIVEACTION_XCHG,
LIVEACTION_STATUS_MGMT_XCHG,
TRIGGER_CUD_XCHG,
Expand Down Expand Up @@ -100,6 +101,9 @@


def _do_register_exchange(exchange, connection, channel, retry_wrapper):
prefix = cfg.CONF.messaging.prefix
if exchange.name and prefix != "st2":
exchange.name = exchange.name.replace("st2.", f"{prefix}.", 1)
try:
kwargs = {
"exchange": exchange.name,
Expand All @@ -123,6 +127,9 @@ def _do_register_exchange(exchange, connection, channel, retry_wrapper):


def _do_predeclare_queue(channel, queue):
prefix = cfg.CONF.messaging.prefix
if queue.name and prefix != "st2":
queue.name = queue.name.replace("st2.", f"{prefix}.", 1)
LOG.debug('Predeclaring queue for exchange "%s"' % (queue.exchange.name))

bound_queue = None
Expand Down
18 changes: 17 additions & 1 deletion st2tests/st2tests/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def _setup_config_opts(coordinator_noop=True):

def _override_config_opts(coordinator_noop=False):
_override_db_opts()
_override_mq_opts()
_override_common_opts()
_override_api_opts()
_override_keyvalue_opts()
Expand Down Expand Up @@ -107,8 +108,18 @@ def db_opts_as_env_vars() -> Dict[str, str]:
return env


def _override_mq_opts():
mq_prefix = CONF.messaging.prefix
mq_prefix = "st2test" if mq_prefix == "st2" else mq_prefix
mq_prefix = mq_prefix + os.environ.get("ST2TESTS_PARALLEL_SLOT", "")
CONF.set_override(name="prefix", override=mq_prefix, group="messaging")


def mq_opts_as_env_vars() -> Dict[str, str]:
return {"ST2_MESSAGING__URL": CONF.messaging.url}
return {
"ST2_MESSAGING__URL": CONF.messaging.url,
"ST2_MESSAGING__PREFIX": CONF.messaging.prefix,
}


def _override_common_opts():
Expand Down Expand Up @@ -269,6 +280,11 @@ def _register_api_opts():
default=None,
help="Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).",
),
cfg.StrOpt(
"prefix",
default="st2",
help="Prefix for all exchange and queue names.",
),
]

_register_opts(messaging_opts, group="messaging")
Expand Down

0 comments on commit af407ca

Please sign in to comment.