Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rename database classes to make some sense #8033

Merged
merged 12 commits into from
Aug 5, 2020
  •  
  •  
  •  
1 change: 1 addition & 0 deletions changelog.d/8033.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Rename storage layer objects to be more sensible.
2 changes: 1 addition & 1 deletion docs/user_directory.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ who are present in a publicly viewable room present on the server.

The directory info is stored in various tables, which can (typically after
DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to execute the SQL [here](../synapse/storage/data_stores/main/schema/delta/53/user_dir_populate.sql)
solution to fix it is to execute the SQL [here](../synapse/storage/databases/main/schema/delta/53/user_dir_populate.sql)
and then restart synapse. This should then start a background task to
flush the current tables and regenerate the directory.
4 changes: 2 additions & 2 deletions scripts-dev/update_database
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class MockHomeserver(HomeServer):
config.server_name, reactor=reactor, config=config, **kwargs
)

self.version_string = "Synapse/"+get_version_string(synapse)
self.version_string = "Synapse/" + get_version_string(synapse)


if __name__ == "__main__":
Expand Down Expand Up @@ -86,7 +86,7 @@ if __name__ == "__main__":
store = hs.get_datastore()

async def run_background_updates():
await store.db.updates.run_background_updates(sleep=False)
await store.db_pool.updates.run_background_updates(sleep=False)
# Stop the reactor to exit the script once every background update is run.
reactor.stop()

Expand Down
78 changes: 39 additions & 39 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,29 @@ from synapse.logging.context import (
make_deferred_yieldable,
run_in_background,
)
from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.data_stores.main.deviceinbox import (
DeviceInboxBackgroundUpdateStore,
)
from synapse.storage.data_stores.main.devices import DeviceBackgroundUpdateStore
from synapse.storage.data_stores.main.events_bg_updates import (
from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
from synapse.storage.databases.main.devices import DeviceBackgroundUpdateStore
from synapse.storage.databases.main.events_bg_updates import (
EventsBackgroundUpdatesStore,
)
from synapse.storage.data_stores.main.media_repository import (
from synapse.storage.databases.main.media_repository import (
MediaRepositoryBackgroundUpdateStore,
)
from synapse.storage.data_stores.main.registration import (
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
find_max_generated_user_id_localpart,
)
from synapse.storage.data_stores.main.room import RoomBackgroundUpdateStore
from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore
from synapse.storage.data_stores.main.state import MainStateBackgroundUpdateStore
from synapse.storage.data_stores.main.stats import StatsStore
from synapse.storage.data_stores.main.user_directory import (
from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.user_directory import (
UserDirectoryBackgroundUpdateStore,
)
from synapse.storage.data_stores.state.bg_updates import StateBackgroundUpdateStore
from synapse.storage.database import Database, make_conn
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.util import Clock
Expand Down Expand Up @@ -175,14 +173,14 @@ class Store(
StatsStore,
):
def execute(self, f, *args, **kwargs):
return self.db.runInteraction(f.__name__, f, *args, **kwargs)
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)

def execute_sql(self, sql, *args):
def r(txn):
txn.execute(sql, args)
return txn.fetchall()

return self.db.runInteraction("execute_sql", r)
return self.db_pool.runInteraction("execute_sql", r)

def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
Expand Down Expand Up @@ -227,7 +225,7 @@ class Porter(object):
async def setup_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
row = await self.postgres_store.db.simple_select_one(
row = await self.postgres_store.db_pool.simple_select_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcols=("forward_rowid", "backward_rowid"),
Expand All @@ -244,7 +242,7 @@ class Porter(object):
) = await self._setup_sent_transactions()
backward_chunk = 0
else:
await self.postgres_store.db.simple_insert(
await self.postgres_store.db_pool.simple_insert(
table="port_from_sqlite3",
values={
"table_name": table,
Expand Down Expand Up @@ -274,7 +272,7 @@ class Porter(object):

await self.postgres_store.execute(delete_all)

await self.postgres_store.db.simple_insert(
await self.postgres_store.db_pool.simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
Expand Down Expand Up @@ -318,7 +316,7 @@ class Porter(object):
if table == "user_directory_stream_pos":
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
await self.postgres_store.db.simple_insert(
await self.postgres_store.db_pool.simple_insert(
table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
Expand Down Expand Up @@ -359,7 +357,7 @@ class Porter(object):

return headers, forward_rows, backward_rows

headers, frows, brows = await self.sqlite_store.db.runInteraction(
headers, frows, brows = await self.sqlite_store.db_pool.runInteraction(
"select", r
)

Expand All @@ -375,7 +373,7 @@ class Porter(object):
def insert(txn):
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)

self.postgres_store.db.simple_update_one_txn(
self.postgres_store.db_pool.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
Expand Down Expand Up @@ -413,7 +411,7 @@ class Porter(object):

return headers, rows

headers, rows = await self.sqlite_store.db.runInteraction("select", r)
headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)

if rows:
forward_chunk = rows[-1][0] + 1
Expand Down Expand Up @@ -451,7 +449,7 @@ class Porter(object):
],
)

self.postgres_store.db.simple_update_one_txn(
self.postgres_store.db_pool.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": "event_search"},
Expand Down Expand Up @@ -494,15 +492,15 @@ class Porter(object):
db_conn, allow_outdated_version=allow_outdated_version
)
prepare_database(db_conn, engine, config=self.hs_config)
store = Store(Database(hs, db_config, engine), db_conn, hs)
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs)
db_conn.commit()

return store

async def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = (
await self.postgres_store.db.updates.has_completed_background_updates()
await self.postgres_store.db_pool.updates.has_completed_background_updates()
)

if not postgres_ready:
Expand All @@ -511,9 +509,9 @@ class Porter(object):
self.progress.set_state("Running background updates on PostgreSQL")

while not postgres_ready:
await self.postgres_store.db.updates.do_next_background_update(100)
await self.postgres_store.db_pool.updates.do_next_background_update(100)
postgres_ready = await (
self.postgres_store.db.updates.has_completed_background_updates()
self.postgres_store.db_pool.updates.has_completed_background_updates()
)

async def run(self):
Expand All @@ -534,7 +532,7 @@ class Porter(object):

# Check if all background updates are done, abort if not.
updates_complete = (
await self.sqlite_store.db.updates.has_completed_background_updates()
await self.sqlite_store.db_pool.updates.has_completed_background_updates()
)
if not updates_complete:
end_error = (
Expand Down Expand Up @@ -576,22 +574,24 @@ class Porter(object):
)

try:
await self.postgres_store.db.runInteraction("alter_table", alter_table)
await self.postgres_store.db_pool.runInteraction(
"alter_table", alter_table
)
except Exception:
# On Error Resume Next
pass

await self.postgres_store.db.runInteraction(
await self.postgres_store.db_pool.runInteraction(
"create_port_table", create_port_table
)

# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = await self.sqlite_store.db.simple_select_onecol(
sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)

postgres_tables = await self.postgres_store.db.simple_select_onecol(
postgres_tables = await self.postgres_store.db_pool.simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
Expand Down Expand Up @@ -692,7 +692,7 @@ class Porter(object):

return headers, [r for r in rows if r[ts_ind] < yesterday]

headers, rows = await self.sqlite_store.db.runInteraction("select", r)
headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)

rows = self._convert_rows("sent_transactions", headers, rows)

Expand Down Expand Up @@ -725,7 +725,7 @@ class Porter(object):
next_chunk = await self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)

await self.postgres_store.db.simple_insert(
await self.postgres_store.db_pool.simple_insert(
table="port_from_sqlite3",
values={
"table_name": "sent_transactions",
Expand Down Expand Up @@ -794,14 +794,14 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))

return self.postgres_store.db.runInteraction("setup_state_group_id_seq", r)
return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)

def _setup_user_id_seq(self):
def r(txn):
next_id = find_max_generated_user_id_localpart(txn) + 1
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))

return self.postgres_store.db.runInteraction("setup_user_id_seq", r)
return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)


##############################################
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def handle_sighup(*args, **kwargs):

# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.get_datastore().db.start_profiling()
hs.get_datastore().db_pool.start_profiling()
hs.get_pusherpool().start()

setup_sentry(hs)
Expand Down
14 changes: 7 additions & 7 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@
from synapse.rest.client.versions import VersionsRestServlet
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.data_stores.main.censor_events import CensorEventsStore
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
from synapse.storage.data_stores.main.monthly_active_users import (
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.storage.data_stores.main.search import SearchWorkerStore
from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.storage.databases.main.presence import UserPresenceState
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
Expand Down
6 changes: 3 additions & 3 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def start():

_base.start(hs, config.listeners)

hs.get_datastore().db.updates.start_doing_background_updates()
hs.get_datastore().db_pool.updates.start_doing_background_updates()
except Exception:
# Print the exception and bail out.
print("Error during startup:", file=sys.stderr)
Expand Down Expand Up @@ -551,8 +551,8 @@ async def phone_stats_home(hs, stats, stats_process=_stats_process):
#

# This only reports info about the *main* database.
stats["database_engine"] = hs.get_datastore().db.engine.module.__name__
stats["database_server_version"] = hs.get_datastore().db.engine.server_version
stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__
stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version

logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
try:
Expand Down
5 changes: 4 additions & 1 deletion synapse/config/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ def __init__(self, name: str, db_config: dict):

self.name = name
self.config = db_config
self.data_stores = data_stores

# The `data_stores` config is actually talking about `databases` (we
# changed the name).
self.databases = data_stores


class DatabaseConfig(Config):
Expand Down
2 changes: 1 addition & 1 deletion synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from synapse.types import StateMap

if TYPE_CHECKING:
from synapse.storage.data_stores.main import DataStore
from synapse.storage.databases.main import DataStore


@attr.s(slots=True)
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
Collection,
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateHandler
from synapse.storage.data_stores.main import DataStore
from synapse.storage.databases.main import DataStore
from synapse.storage.presence import UserPresenceState
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -319,7 +319,7 @@ async def _on_shutdown(self):
is some spurious presence changes that will self-correct.
"""
# If the DB pool has already terminated, don't try updating
if not self.store.db.is_running():
if not self.store.db_pool.is_running():
return

logger.info(
Expand Down
2 changes: 1 addition & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def run_db_interaction(self, desc, func, *args, **kwargs):
Returns:
Deferred[object]: result of func
"""
return self._store.db.runInteraction(desc, func, *args, **kwargs)
return self._store.db_pool.runInteraction(desc, func, *args, **kwargs)

def complete_sso_login(
self, registered_user_id: str, request: SynapseRequest, client_redirect_url: str
Expand Down
Loading