Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixups to new push stream #17038

Merged
merged 6 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changelog.d/17037.feature
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Add support for moving `/push_rules` off of main process.
Add support for moving `/pushrules` off of main process.
1 change: 1 addition & 0 deletions changelog.d/17038.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for moving `/pushrules` off of main process.
8 changes: 8 additions & 0 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"push_rules": {
sandhose marked this conversation as resolved.
Show resolved Hide resolved
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
}

# Templates for sections that may be inserted multiple times in config files
Expand Down Expand Up @@ -401,6 +408,7 @@ def add_worker_roles_to_shared_config(
"receipts",
"to_device",
"typing",
"push_rules",
]

# Worker-type specific sharding config. Now a single worker can fulfill multiple
Expand Down
4 changes: 2 additions & 2 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,12 +532,12 @@ the stream writer for the `presence` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/presence/

##### The `push` stream
##### The `push_rules` stream

The following endpoints should be routed directly to the worker configured as
the stream writer for the `push` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/push_rules/
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/

#### Restrict outbound federation traffic to a specific set of workers

Expand Down
8 changes: 4 additions & 4 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class WriterLocations:
can only be a single instance.
presence: The instances that write to the presence stream. Currently
can only be a single instance.
push: The instances that write to the push stream. Currently
push_rules: The instances that write to the push stream. Currently
can only be a single instance.
"""

Expand Down Expand Up @@ -184,7 +184,7 @@ class WriterLocations:
default=["master"],
converter=_instance_to_list_converter,
)
push: List[str] = attr.ib(
push_rules: List[str] = attr.ib(
default=["master"],
converter=_instance_to_list_converter,
)
Expand Down Expand Up @@ -347,7 +347,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"account_data",
"receipts",
"presence",
"push",
"push_rules",
):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
Expand Down Expand Up @@ -385,7 +385,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"Must only specify one instance to handle `presence` messages."
)

if len(self.writers.push) != 1:
if len(self.writers.push_rules) != 1:
raise ConfigError(
"Must only specify one instance to handle `push` messages."
)
Expand Down
6 changes: 4 additions & 2 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ def __init__(self, hs: "HomeServer"):
hs.config.server.forgotten_room_retention_period
)

self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push
self._push_writer = hs.config.worker.writers.push[0]
self._is_push_writer = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
self._push_writer = hs.config.worker.writers.push_rules[0]
self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs)

def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def __init__(self, hs: "HomeServer"):
continue

if isinstance(stream, PushRulesStream):
if hs.get_instance_name() in hs.config.worker.writers.push:
if hs.get_instance_name() in hs.config.worker.writers.push_rules:
self._streams_to_replicate.append(stream)

continue
Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
self._is_push_worker = hs.get_instance_name() in hs.config.worker.writers.push
self._is_push_worker = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
self._push_rules_handler = hs.get_push_rules_handler()
self._push_rule_linearizer = Linearizer(name="push_rules")

Expand Down
4 changes: 3 additions & 1 deletion synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ def __init__(
):
super().__init__(database, db_conn, hs)

self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push
self._is_push_writer = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
Expand Down
Loading