Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Rename get_tcp_replication to get_replication_command_handler. (#12192)
Browse files Browse the repository at this point in the history
Since the object it returns is a ReplicationCommandHandler.

This is clean-up from adding support to Redis where the command handler
was added as an additional layer of abstraction from the TCP protocol.
  • Loading branch information
clokep authored Mar 10, 2022
1 parent a4c1fdb commit 3e4af36
Show file tree
Hide file tree
Showing 15 changed files with 20 additions and 19 deletions.
1 change: 1 addition & 0 deletions changelog.d/12192.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Rename `HomeServer.get_tcp_replication` to `get_replication_command_handler`.
2 changes: 1 addition & 1 deletion synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def start_listening(self) -> None:
else:
logger.warning("Unsupported listener type: %s", listener.type)

self.get_tcp_replication().start_replication(self)
self.get_replication_command_handler().start_replication(self)


def start(config_options: List[str]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def start_listening(self) -> None:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
self.get_tcp_replication().start_replication(self)
self.get_replication_command_handler().start_replication(self)

for listener in self.config.server.listeners:
if listener.type == "http":
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/transport/server/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, hs: "HomeServer"):

self.replication_client = None
if hs.config.worker.worker_app:
self.replication_client = hs.get_tcp_replication()
self.replication_client = hs.get_replication_command_handler()

# A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,13 @@ def __init__(self, hs: "HomeServer"):

async def _on_shutdown(self) -> None:
if self._presence_enabled:
self.hs.get_tcp_replication().send_command(
self.hs.get_replication_command_handler().send_command(
ClearUserSyncsCommand(self.instance_id)
)

def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
if self._presence_enabled:
self.hs.get_tcp_replication().send_user_sync(
self.hs.get_replication_command_handler().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ async def insert_client_ip(

self.client_ip_last_seen.set(key, now)

self.hs.get_tcp_replication().send_user_ip(
self.hs.get_replication_command_handler().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)
4 changes: 3 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ async def _save_and_send_ack(self) -> None:

# We ACK this token over replication so that the master can drop
# its in memory queues
self._hs.get_tcp_replication().send_federation_ack(current_position)
self._hs.get_replication_command_handler().send_federation_ack(
current_position
)
except Exception:
logger.exception("Error updating federation stream position")
4 changes: 1 addition & 3 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ async def _process_command(
raise Exception("Unrecognised command %s in stream queue", cmd.NAME)

def start_replication(self, hs: "HomeServer") -> None:
"""Helper method to start a replication connection to the remote server
using TCP.
"""
"""Helper method to start replication."""
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def __init__(
password=hs.config.redis.redis_password,
)

self.synapse_handler = hs.get_tcp_replication()
self.synapse_handler = hs.get_replication_command_handler()
self.synapse_stream_name = hs.hostname

self.synapse_outbound_redis_connection = outbound_redis_connection
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory):
"""Factory for new replication connections."""

def __init__(self, hs: "HomeServer"):
self.command_handler = hs.get_tcp_replication()
self.command_handler = hs.get_replication_command_handler()
self.clock = hs.get_clock()
self.server_name = hs.config.server.server_name

Expand Down Expand Up @@ -85,7 +85,7 @@ def __init__(self, hs: "HomeServer"):
self.is_looping = False
self.pending_updates = False

self.command_handler = hs.get_tcp_replication()
self.command_handler = hs.get_replication_command_handler()

# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()
Expand Down
2 changes: 1 addition & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ def get_read_marker_handler(self) -> ReadMarkerHandler:
return ReadMarkerHandler(self)

@cache_in_self
def get_tcp_replication(self) -> ReplicationCommandHandler:
def get_replication_command_handler(self) -> ReplicationCommandHandler:
return ReplicationCommandHandler(self)

@cache_in_self
Expand Down
4 changes: 2 additions & 2 deletions tests/replication/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def setUp(self):
self.connect_any_redis_attempts,
)

self.hs.get_tcp_replication().start_replication(self.hs)
self.hs.get_replication_command_handler().start_replication(self.hs)

# When we see a connection attempt to the master replication listener we
# automatically set up the connection. This is so that tests don't
Expand Down Expand Up @@ -375,7 +375,7 @@ def make_worker_hs(
)

if worker_hs.config.redis.redis_enabled:
worker_hs.get_tcp_replication().start_replication(worker_hs)
worker_hs.get_replication_command_handler().start_replication(worker_hs)

return worker_hs

Expand Down
2 changes: 1 addition & 1 deletion tests/replication/tcp/streams/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def test_backwards_stream_id(self):

# Manually send an old RDATA command, which should get dropped. This
# re-uses the row from above, but with an earlier stream token.
self.hs.get_tcp_replication().send_command(
self.hs.get_replication_command_handler().send_command(
RdataCommand("events", "master", 1, row)
)

Expand Down
2 changes: 1 addition & 1 deletion tests/replication/tcp/streams/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_reset(self):

# Reset the typing handler
self.hs.get_replication_streams()["typing"].last_token = 0
self.hs.get_tcp_replication()._streams["typing"].last_token = 0
self.hs.get_replication_command_handler()._streams["typing"].last_token = 0
typing._latest_room_serial = 0
typing._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", typing._latest_room_serial
Expand Down
2 changes: 1 addition & 1 deletion tests/replication/test_federation_ack.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_federation_ack_sent(self):
transport, rather than assuming that the implementation has a
ReplicationCommandHandler.
"""
rch = self.hs.get_tcp_replication()
rch = self.hs.get_replication_command_handler()

# wire up the ReplicationCommandHandler to a mock connection, which needs
# to implement IReplicationConnection. (Note that Mock doesn't understand
Expand Down

0 comments on commit 3e4af36

Please sign in to comment.