From 01fbffb72fbbd75e1ddbe20e76b5da24cebe7a17 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Sun, 20 Nov 2022 02:29:23 +0000 Subject: [PATCH 01/11] Add new function based on existing logic. --- synapse/config/workers.py | 45 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 913b83e1745b..e201876399d7 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -425,6 +425,51 @@ def _should_this_worker_perform_duty( # (By this point, these are either the same value or only one is not None.) return bool(new_option_should_run_here or legacy_option_should_run_here) + def _worker_name_performing_this_duty( + self, + config: Dict[str, Any], + legacy_option_name: str, + legacy_app_name: str, + modern_instance_map_name: str, + ) -> List[str]: + """ + retrieves the name of the worker handling a given duty, by either legacy option or instance_map + config: settings read from yaml. + legacy_option_name: the old way of enabling options. e.g. 'start_pushers' + legacy_app_name: The historical app name. e.g. 'synapse.app.pusher' + modern_instance_map_name: the string name of the new instance_map. e.g. 'pusher_instances' + """ + + legacy_option = config.get(legacy_option_name, True) + + worker_instance_map = config.get(modern_instance_map_name) + if worker_instance_map is None: + # Default to an empty list, which means "another, unknown, worker is + # responsible for it". + worker_instance_map = [] + + # If no worker instances are set we check if + # `legacy_option_name` is set, which means use master + if legacy_option: + worker_instance_map = ["master"] + + if self.worker_app == legacy_app_name: + if legacy_option: + # If we're using `legacy_app_name`, and not using + # `modern_instance_map_name`, then we should have + # explicitly set `legacy_option_name` to false. + raise ConfigError( + f"The '{legacy_option_name}' config option must be disabled in " + "the main synapse process before they can be run in a separate " + "worker.\n" + f"Please add `{legacy_option_name}: false` to the main config.\n", + ) + + worker_instance_map = [self.worker_name] + + return worker_instance_map + + def read_arguments(self, args: argparse.Namespace) -> None: # We support a bunch of command line arguments that override options in # the config. A lot of these options have a worker_* prefix when running From 4505263a4b741a2aa43ee61b2a7d2a559725f5f1 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Sun, 20 Nov 2022 02:39:10 +0000 Subject: [PATCH 02/11] Migrate pusher handling to new function and delete associated error message. --- synapse/config/workers.py | 34 ++++++---------------------------- 1 file changed, 6 insertions(+), 28 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index e201876399d7..37ff5795ca53 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -36,13 +36,6 @@ Please add ``send_federation: false`` to the main config """ -_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """ -The start_pushers config option must be disabled in the main -synapse process before they can be run in a separate worker. - -Please add ``start_pushers: false`` to the main config -""" - _DEPRECATED_WORKER_DUTY_OPTION_USED = """ The '%s' configuration option is deprecated and will be removed in a future Synapse version. Please use ``%s: name_of_worker`` instead. @@ -282,27 +275,12 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: ) # Handle sharded push - start_pushers = config.get("start_pushers", True) - pusher_instances = config.get("pusher_instances") - if pusher_instances is None: - # Default to an empty list, which means "another, unknown, worker is - # responsible for it". - pusher_instances = [] - - # If no pushers instances are set we check if `start_pushers` is - # set, which means use master - if start_pushers: - pusher_instances = ["master"] - - if self.worker_app == "synapse.app.pusher": - if start_pushers: - # If we're running pushers, and not using - # `pusher_instances`, then we should have explicitly set - # `start_pushers` to false. - raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR) - - pusher_instances = [self.instance_name] - + pusher_instances = self._worker_name_performing_this_duty( + config, + "start_pushers", + "synapse.app.pusher", + "pusher_instances", + ) self.start_pushers = self.instance_name in pusher_instances self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) From e9e224f50fbd88b734684eaef5948a2a24c5e28e Mon Sep 17 00:00:00 2001 From: Jason Little Date: Sun, 20 Nov 2022 02:50:05 +0000 Subject: [PATCH 03/11] Migrate federation sender handling to new function and delete associated error message. Group next to pusher config so it's not orphaned. --- synapse/config/workers.py | 67 ++++++++++++--------------------------- 1 file changed, 21 insertions(+), 46 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 37ff5795ca53..8d844c2b5683 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -29,13 +29,6 @@ ) from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def -_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """ -The send_federation config option must be disabled in the main -synapse process before they can be run in a separate worker. - -Please add ``send_federation: false`` to the main config -""" - _DEPRECATED_WORKER_DUTY_OPTION_USED = """ The '%s' configuration option is deprecated and will be removed in a future Synapse version. Please use ``%s: name_of_worker`` instead. @@ -175,45 +168,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: ) ) - # Handle federation sender configuration. - # - # There are two ways of configuring which instances handle federation - # sending: - # 1. The old way where "send_federation" is set to false and running a - # `synapse.app.federation_sender` worker app. - # 2. Specifying the workers sending federation in - # `federation_sender_instances`. - # - - send_federation = config.get("send_federation", True) - - federation_sender_instances = config.get("federation_sender_instances") - if federation_sender_instances is None: - # Default to an empty list, which means "another, unknown, worker is - # responsible for it". - federation_sender_instances = [] - - # If no federation sender instances are set we check if - # `send_federation` is set, which means use master - if send_federation: - federation_sender_instances = ["master"] - - if self.worker_app == "synapse.app.federation_sender": - if send_federation: - # If we're running federation senders, and not using - # `federation_sender_instances`, then we should have - # explicitly set `send_federation` to false. - raise ConfigError( - _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR - ) - - federation_sender_instances = [self.worker_name] - - self.send_federation = self.instance_name in federation_sender_instances - self.federation_shard_config = ShardedWorkerHandlingConfig( - federation_sender_instances - ) - # A map from instance name to host/port of their HTTP replication endpoint. instance_map = config.get("instance_map") or {} self.instance_map = { @@ -274,6 +228,27 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.writers.events ) + # Handle federation sender configuration. + # + # There are two ways of configuring which instances handle federation + # sending: + # 1. The old way where "send_federation" is set to false and running a + # `synapse.app.federation_sender` worker app. + # 2. Specifying the workers sending federation in + # `federation_sender_instances`. + # + + federation_sender_instances = self._worker_name_performing_this_duty( + config, + "send_federation", + "synapse.app.federation_sender", + "federation_sender_instances", + ) + self.send_federation = self.instance_name in federation_sender_instances + self.federation_shard_config = ShardedWorkerHandlingConfig( + federation_sender_instances + ) + # Handle sharded push pusher_instances = self._worker_name_performing_this_duty( config, From 1f156e9b19e1a2cddb657f1ecb5bba27efc7c019 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Sun, 20 Nov 2022 02:53:58 +0000 Subject: [PATCH 04/11] Sad to see this comment go. It was my direction for several months of investigation. --- synapse/config/workers.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 8d844c2b5683..4fc0d8a85360 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -228,16 +228,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.writers.events ) - # Handle federation sender configuration. - # - # There are two ways of configuring which instances handle federation - # sending: - # 1. The old way where "send_federation" is set to false and running a - # `synapse.app.federation_sender` worker app. - # 2. Specifying the workers sending federation in - # `federation_sender_instances`. - # - federation_sender_instances = self._worker_name_performing_this_duty( config, "send_federation", From bfd665fe1b276a803e8360def8dbcfbffbe15b0f Mon Sep 17 00:00:00 2001 From: Jason Little Date: Sun, 20 Nov 2022 03:03:29 +0000 Subject: [PATCH 05/11] Linting --- synapse/config/workers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 4fc0d8a85360..f15ae9a7464e 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -412,7 +412,6 @@ def _worker_name_performing_this_duty( return worker_instance_map - def read_arguments(self, args: argparse.Namespace) -> None: # We support a bunch of command line arguments that override options in # the config. A lot of these options have a worker_* prefix when running From bf91a5da230d280c6e6e0f77a37c969f3b694ea9 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Sun, 20 Nov 2022 04:09:34 +0000 Subject: [PATCH 06/11] Changelog --- changelog.d/14496.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14496.misc diff --git a/changelog.d/14496.misc b/changelog.d/14496.misc new file mode 100644 index 000000000000..57fc6cf452e8 --- /dev/null +++ b/changelog.d/14496.misc @@ -0,0 +1 @@ +Refactor `federation_sender` and `pusher` configuration loading. From 73eea75fb171afe226fe7b3ce52a478e1a1cc3bd Mon Sep 17 00:00:00 2001 From: Jason Little Date: Sun, 20 Nov 2022 04:14:42 +0000 Subject: [PATCH 07/11] Small nits about plurality. --- synapse/config/workers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index f15ae9a7464e..3b5e07a1d1b3 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -228,7 +228,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.writers.events ) - federation_sender_instances = self._worker_name_performing_this_duty( + federation_sender_instances = self._worker_names_performing_this_duty( config, "send_federation", "synapse.app.federation_sender", @@ -240,7 +240,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: ) # Handle sharded push - pusher_instances = self._worker_name_performing_this_duty( + pusher_instances = self._worker_names_performing_this_duty( config, "start_pushers", "synapse.app.pusher", @@ -368,7 +368,7 @@ def _should_this_worker_perform_duty( # (By this point, these are either the same value or only one is not None.) return bool(new_option_should_run_here or legacy_option_should_run_here) - def _worker_name_performing_this_duty( + def _worker_names_performing_this_duty( self, config: Dict[str, Any], legacy_option_name: str, @@ -376,7 +376,7 @@ def _worker_name_performing_this_duty( modern_instance_map_name: str, ) -> List[str]: """ - retrieves the name of the worker handling a given duty, by either legacy option or instance_map + retrieves the names of the workers handling a given duty, by either legacy option or instance_map config: settings read from yaml. legacy_option_name: the old way of enabling options. e.g. 'start_pushers' legacy_app_name: The historical app name. e.g. 'synapse.app.pusher' From 2aa954e4e828acf3e2d320e38879555514a3ec17 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 21 Nov 2022 23:31:37 +0000 Subject: [PATCH 08/11] Per review, put federation_sender logic back up where it was. --- synapse/config/workers.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 3b5e07a1d1b3..26908a5ae06a 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -168,6 +168,17 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: ) ) + federation_sender_instances = self._worker_names_performing_this_duty( + config, + "send_federation", + "synapse.app.federation_sender", + "federation_sender_instances", + ) + self.send_federation = self.instance_name in federation_sender_instances + self.federation_shard_config = ShardedWorkerHandlingConfig( + federation_sender_instances + ) + # A map from instance name to host/port of their HTTP replication endpoint. instance_map = config.get("instance_map") or {} self.instance_map = { @@ -228,17 +239,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.writers.events ) - federation_sender_instances = self._worker_names_performing_this_duty( - config, - "send_federation", - "synapse.app.federation_sender", - "federation_sender_instances", - ) - self.send_federation = self.instance_name in federation_sender_instances - self.federation_shard_config = ShardedWorkerHandlingConfig( - federation_sender_instances - ) - # Handle sharded push pusher_instances = self._worker_names_performing_this_duty( config, From e50e444437c96451782d014b5009791e7fa888f1 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 21 Nov 2022 23:36:53 +0000 Subject: [PATCH 09/11] Per review, fix formatting of docstring. Drive by- change 'instance_map' to 'instance list'. --- synapse/config/workers.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 26908a5ae06a..7ca56dfefac7 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -376,11 +376,13 @@ def _worker_names_performing_this_duty( modern_instance_map_name: str, ) -> List[str]: """ - retrieves the names of the workers handling a given duty, by either legacy option or instance_map - config: settings read from yaml. - legacy_option_name: the old way of enabling options. e.g. 'start_pushers' - legacy_app_name: The historical app name. e.g. 'synapse.app.pusher' - modern_instance_map_name: the string name of the new instance_map. e.g. 'pusher_instances' + Retrieves the names of the workers handling a given duty, by either legacy option or instance list + + Args: + config: settings read from yaml. + legacy_option_name: the old way of enabling options. e.g. 'start_pushers' + legacy_app_name: The historical app name. e.g. 'synapse.app.pusher' + modern_instance_map_name: the string name of the new instance_map. e.g. 'pusher_instances' """ legacy_option = config.get(legacy_option_name, True) From 47ceef6d1ae7080af9be84b9466dc9850fab93f2 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Tue, 22 Nov 2022 01:15:36 +0000 Subject: [PATCH 10/11] Per review, rename 'worker_instances_map' to 'worker_instances' This does make it clearer. While here, also rename other references to 'map' to 'list'. --- synapse/config/workers.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 7ca56dfefac7..ba05b6383072 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -373,35 +373,37 @@ def _worker_names_performing_this_duty( config: Dict[str, Any], legacy_option_name: str, legacy_app_name: str, - modern_instance_map_name: str, + modern_instance_list_name: str, ) -> List[str]: """ - Retrieves the names of the workers handling a given duty, by either legacy option or instance list + Retrieves the names of the workers handling a given duty, by either legacy + option or instance list Args: config: settings read from yaml. legacy_option_name: the old way of enabling options. e.g. 'start_pushers' legacy_app_name: The historical app name. e.g. 'synapse.app.pusher' - modern_instance_map_name: the string name of the new instance_map. e.g. 'pusher_instances' + modern_instance_list_name: the string name of the new instance_list. e.g. + 'pusher_instances' """ legacy_option = config.get(legacy_option_name, True) - worker_instance_map = config.get(modern_instance_map_name) - if worker_instance_map is None: + worker_instances = config.get(modern_instance_list_name) + if worker_instances is None: # Default to an empty list, which means "another, unknown, worker is # responsible for it". - worker_instance_map = [] + worker_instances = [] - # If no worker instances are set we check if - # `legacy_option_name` is set, which means use master + # If no worker instances are set we check if the legacy option + # is set, which means use the main process. if legacy_option: - worker_instance_map = ["master"] + worker_instances = ["master"] if self.worker_app == legacy_app_name: if legacy_option: # If we're using `legacy_app_name`, and not using - # `modern_instance_map_name`, then we should have + # `modern_instance_list_name`, then we should have # explicitly set `legacy_option_name` to false. raise ConfigError( f"The '{legacy_option_name}' config option must be disabled in " @@ -410,9 +412,9 @@ def _worker_names_performing_this_duty( f"Please add `{legacy_option_name}: false` to the main config.\n", ) - worker_instance_map = [self.worker_name] + worker_instances = [self.worker_name] - return worker_instance_map + return worker_instances def read_arguments(self, args: argparse.Namespace) -> None: # We support a bunch of command line arguments that override options in From c6915b2c52f934ebc5e1d07c5e6d2b03524c665e Mon Sep 17 00:00:00 2001 From: realtyem Date: Tue, 22 Nov 2022 14:55:43 -0600 Subject: [PATCH 11/11] Apply suggestions from code review I'm so glad this got preserved, awesome suggestion. Co-authored-by: Patrick Cloke --- synapse/config/workers.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ba05b6383072..2580660b6c27 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -377,7 +377,14 @@ def _worker_names_performing_this_duty( ) -> List[str]: """ Retrieves the names of the workers handling a given duty, by either legacy - option or instance list + option or instance list. + + There are two ways of configuring which instances handle a given duty, e.g. + for configuring pushers: + + 1. The old way where "start_pushers" is set to false and running a + `synapse.app.pusher'` worker app. + 2. Specifying the workers sending federation in `pusher_instances`. Args: config: settings read from yaml. @@ -385,6 +392,9 @@ def _worker_names_performing_this_duty( legacy_app_name: The historical app name. e.g. 'synapse.app.pusher' modern_instance_list_name: the string name of the new instance_list. e.g. 'pusher_instances' + + Returns: + A list of worker instance names handling the given duty. """ legacy_option = config.get(legacy_option_name, True)