Skip to content

Commit

Permalink
Set our own stream position from the current sequence value on startup (
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhose authored Jun 17, 2024
1 parent 12d7303 commit f983a77
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 178 deletions.
1 change: 1 addition & 0 deletions changelog.d/17309.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
When rolling back to a previous Synapse version and then forwards again to this release, don't require server operators to manually run SQL.
23 changes: 20 additions & 3 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,6 @@ def __init__(
# no active writes in progress.
self._max_position_of_local_instance = self._max_seen_allocated_stream_id

# This goes and fills out the above state from the database.
self._load_current_ids(db_conn, tables)

self._sequence_gen = build_sequence_generator(
db_conn=db_conn,
database_engine=db.engine,
Expand All @@ -303,6 +300,13 @@ def __init__(
positive=positive,
)

# This goes and fills out the above state from the database.
# This may read on the PostgreSQL sequence, and
# SequenceGenerator.check_consistency might have fixed up the sequence, which
# means the SequenceGenerator needs to be setup before we read the value from
# the sequence.
self._load_current_ids(db_conn, tables, sequence_name)

self._max_seen_allocated_stream_id = max(
self._current_positions.values(), default=1
)
Expand All @@ -327,6 +331,7 @@ def _load_current_ids(
self,
db_conn: LoggingDatabaseConnection,
tables: List[Tuple[str, str, str]],
sequence_name: str,
) -> None:
cur = db_conn.cursor(txn_name="_load_current_ids")

Expand Down Expand Up @@ -360,6 +365,18 @@ def _load_current_ids(
if instance in self._writers
}

# If we're a writer, we can assume we're at the end of the stream
# Usually, we would get that from the stream_positions, but in some cases,
# like if we rolled back Synapse, the stream_positions table might not be up to
# date. If we're using Postgres for the sequences, we can just use the current
# sequence value as our own position.
if self._instance_name in self._writers:
if isinstance(self._db.engine, PostgresEngine):
cur.execute(f"SELECT last_value FROM {sequence_name}")
row = cur.fetchone()
assert row is not None
self._current_positions[self._instance_name] = row[0]

# We set the `_persisted_upto_position` to be the minimum of all current
# positions. If empty we use the max stream ID from the DB table.
min_stream_id = min(self._current_positions.values(), default=None)
Expand Down
Loading

0 comments on commit f983a77

Please sign in to comment.