Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Reduce max time we wait for stream positions (#14881)
Browse files Browse the repository at this point in the history
Now that we wait for stream positions whenever we do a HTTP replication
hit, we need to be less brutal in the case where we do timeout (as we
have bugs around this).
  • Loading branch information
erikjohnston authored Jan 20, 2023
1 parent 65d0386 commit 0ec12a3
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
1 change: 1 addition & 0 deletions changelog.d/14881.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce max time we wait for stream positions.
2 changes: 0 additions & 2 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
instance_name=instance_name,
stream_name=stream_name,
position=position,
raise_on_timeout=False,
)

return result
Expand Down Expand Up @@ -414,7 +413,6 @@ async def _check_auth_and_handle(
instance_name=content[_STREAM_POSITION_KEY]["instance_name"],
stream_name=stream_name,
position=position,
raise_on_timeout=False,
)

if self.CACHE:
Expand Down
21 changes: 11 additions & 10 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
logger = logging.getLogger(__name__)

# How long we allow callers to wait for replication updates before timing out.
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5


class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
Expand Down Expand Up @@ -326,7 +326,6 @@ async def wait_for_stream_position(
instance_name: str,
stream_name: str,
position: int,
raise_on_timeout: bool = True,
) -> None:
"""Wait until this instance has received updates up to and including
the given stream position.
Expand All @@ -335,8 +334,6 @@ async def wait_for_stream_position(
instance_name
stream_name
position
raise_on_timeout: Whether to raise an exception if we time out
waiting for the updates, or if we log an error and return.
"""

if instance_name == self._instance_name:
Expand Down Expand Up @@ -365,19 +362,23 @@ async def wait_for_stream_position(

# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
logger.info("Waiting for repl stream %r to reach %s", stream_name, position)
logger.info(
"Waiting for repl stream %r to reach %s (%s)",
stream_name,
position,
instance_name,
)
try:
await make_deferred_yieldable(deferred)
except defer.TimeoutError:
logger.error("Timed out waiting for stream %s", stream_name)

if raise_on_timeout:
raise

return

logger.info(
"Finished waiting for repl stream %r to reach %s", stream_name, position
"Finished waiting for repl stream %r to reach %s (%s)",
stream_name,
position,
instance_name,
)

def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
Expand Down

0 comments on commit 0ec12a3

Please sign in to comment.