Skip to content

Commit

Permalink
[QOLDEV-905] ensure lock timeout is applied to dropping datastore ind…
Browse files Browse the repository at this point in the history
…exes
  • Loading branch information
ThrawnCA committed Oct 25, 2024
1 parent b90e6b7 commit 513b0c7
Showing 1 changed file with 123 additions and 124 deletions.
247 changes: 123 additions & 124 deletions ckanext/datastore/backend/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

_TIMEOUT = 60000 # milliseconds
_LOCK_TIMEOUT = 20000
_LOCK_TIMEOUT_SQL = u"SET LOCAL lock_timeout = {}".format(_LOCK_TIMEOUT)
_LOCK_TIMEOUT_SQL = u"SET lock_timeout = {}".format(_LOCK_TIMEOUT)

# See http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
_PG_ERR_CODE = {
Expand Down Expand Up @@ -703,11 +703,15 @@ def _drop_indexes(context: Context, data_dict: dict[str, Any],
AND idx.indisprimary = false
AND t.relname = %s
""".format(unique='true' if unique else 'false')
indexes_to_drop = context['connection'].execute(
sql_get_index_string, data_dict['resource_id']).fetchall()
for index in indexes_to_drop:
context['connection'].execute(
sql_drop_index.format(index[0]).replace('%', '%%'))
with context['connection'].begin():
indexes_to_drop = context['connection'].execute(
sql_get_index_string, data_dict['resource_id']).fetchall()

with context['connection'].begin():
context['connection'].execute(_LOCK_TIMEOUT_SQL)
for index in indexes_to_drop:
context['connection'].execute(
sql_drop_index.format(index[0]).replace('%', '%%'))


def _get_index_names(connection: Any, resource_id: str):
Expand Down Expand Up @@ -2114,128 +2118,123 @@ def resource_fields(self, id: str) -> dict[str, Any]:

try:
engine = self._get_read_engine()

# resource id for deferencing aliases
info['meta']['id'] = id

# count of rows in table
meta_sql = sa.text(
u'SELECT count(_id) FROM "{0}"'.format(id))
with engine.connect() as conn:
conn.execute(_LOCK_TIMEOUT_SQL)
meta_results = conn.execute(meta_sql)
info['meta']['count'] = meta_results.fetchone()[0]

# table_type - BASE TABLE, VIEW, FOREIGN TABLE, MATVIEW
tabletype_sql = sa.text(u'''
SELECT table_type FROM INFORMATION_SCHEMA.TABLES
WHERE table_name = '{0}'
'''.format(id))
with engine.connect() as conn:
conn.execute(_LOCK_TIMEOUT_SQL)
tabletype_results = conn.execute(tabletype_sql)
info['meta']['table_type'] = \
tabletype_results.fetchone()[0]
# MATERIALIZED VIEWS show as BASE TABLE, so
# we check pg_matviews
matview_sql = sa.text(u'''
SELECT count(*) FROM pg_matviews
WHERE matviewname = '{0}'
# resource id for deferencing aliases
info['meta']['id'] = id

# count of rows in table
meta_sql = sa.text(
u'SELECT count(_id) FROM "{0}"'.format(id))
with conn.begin():
meta_results = conn.execute(meta_sql)
info['meta']['count'] = meta_results.fetchone()[0]

# table_type - BASE TABLE, VIEW, FOREIGN TABLE, MATVIEW
tabletype_sql = sa.text(u'''
SELECT table_type FROM INFORMATION_SCHEMA.TABLES
WHERE table_name = '{0}'
'''.format(id))
with conn.begin():
tabletype_results = conn.execute(tabletype_sql)
info['meta']['table_type'] = \
tabletype_results.fetchone()[0]
# MATERIALIZED VIEWS show as BASE TABLE, so
# we check pg_matviews
matview_sql = sa.text(u'''
SELECT count(*) FROM pg_matviews
WHERE matviewname = '{0}'
'''.format(id))
with conn.begin():
matview_results = conn.execute(matview_sql)
if matview_results.fetchone()[0]:
info['meta']['table_type'] = 'MATERIALIZED VIEW'

# SIZE - size of table in bytes
size_sql = sa.text(
u"SELECT pg_relation_size('{0}')".format(id))
with conn.begin():
size_results = conn.execute(size_sql)
info['meta']['size'] = size_results.fetchone()[0]

# DB_SIZE - size of database in bytes
dbsize_sql = sa.text(
u"SELECT pg_database_size(current_database())")
with conn.begin():
dbsize_results = conn.execute(dbsize_sql)
info['meta']['db_size'] = \
dbsize_results.fetchone()[0]

# IDXSIZE - size of all indices for table in bytes
idxsize_sql = sa.text(
u"SELECT pg_indexes_size('{0}')".format(id))
with conn.begin():
idxsize_results = conn.execute(idxsize_sql)
info['meta']['idx_size'] = \
idxsize_results.fetchone()[0]

# all the aliases for this resource
alias_sql = sa.text(u'''
SELECT name FROM "_table_metadata" WHERE alias_of = '{0}'
'''.format(id))
with engine.connect() as conn:
conn.execute(_LOCK_TIMEOUT_SQL)
matview_results = conn.execute(matview_sql)
if matview_results.fetchone()[0]:
info['meta']['table_type'] = 'MATERIALIZED VIEW'

# SIZE - size of table in bytes
size_sql = sa.text(
u"SELECT pg_relation_size('{0}')".format(id))
with engine.connect() as conn:
conn.execute(_LOCK_TIMEOUT_SQL)
size_results = conn.execute(size_sql)
info['meta']['size'] = size_results.fetchone()[0]

# DB_SIZE - size of database in bytes
dbsize_sql = sa.text(
u"SELECT pg_database_size(current_database())")
with engine.connect() as conn:
conn.execute(_LOCK_TIMEOUT_SQL)
dbsize_results = conn.execute(dbsize_sql)
info['meta']['db_size'] = \
dbsize_results.fetchone()[0]

# IDXSIZE - size of all indices for table in bytes
idxsize_sql = sa.text(
u"SELECT pg_indexes_size('{0}')".format(id))
with engine.connect() as conn:
conn.execute(_LOCK_TIMEOUT_SQL)
idxsize_results = conn.execute(idxsize_sql)
info['meta']['idx_size'] = \
idxsize_results.fetchone()[0]

# all the aliases for this resource
alias_sql = sa.text(u'''
SELECT name FROM "_table_metadata" WHERE alias_of = '{0}'
'''.format(id))
with engine.connect() as conn:
conn.execute(_LOCK_TIMEOUT_SQL)
alias_results = conn.execute(alias_sql)
aliases = []
for alias in alias_results.fetchall():
aliases.append(alias[0])
info['meta']['aliases'] = aliases

# get the data dictionary for the resource
data_dictionary = datastore_helpers.datastore_dictionary(id)

schema_sql = sa.text(u'''
SELECT
f.attname AS column_name,
pg_catalog.format_type(f.atttypid,f.atttypmod) AS native_type,
f.attnotnull AS notnull,
i.relname as index_name,
CASE
WHEN i.oid<>0 THEN True
ELSE False
END AS is_index,
CASE
WHEN p.contype = 'u' THEN True
WHEN p.contype = 'p' THEN True
ELSE False
END AS uniquekey
FROM pg_attribute f
JOIN pg_class c ON c.oid = f.attrelid
JOIN pg_type t ON t.oid = f.atttypid
LEFT JOIN pg_constraint p ON p.conrelid = c.oid
AND f.attnum = ANY (p.conkey)
LEFT JOIN pg_index AS ix ON f.attnum = ANY(ix.indkey)
AND c.oid = f.attrelid AND c.oid = ix.indrelid
LEFT JOIN pg_class AS i ON ix.indexrelid = i.oid
WHERE c.relkind = 'r'::char
AND c.relname = '{0}'
AND f.attnum > 0
ORDER BY c.relname,f.attnum;
'''.format(id))
with engine.connect() as conn:
conn.execute(_LOCK_TIMEOUT_SQL)
schema_results = conn.execute(schema_sql)
schemainfo = {}
for row in schema_results.fetchall():
row: Any # Row has incomplete type definition
colname: str = row.column_name
if colname.startswith('_'): # Skip internal rows
continue
colinfo: dict[str, Any] = {'native_type': row.native_type,
'notnull': row.notnull,
'index_name': row.index_name,
'is_index': row.is_index,
'uniquekey': row.uniquekey}
schemainfo[colname] = colinfo

for field in data_dictionary:
field.update({'schema': schemainfo[field['id']]})
info['fields'].append(field)
with conn.begin():
alias_results = conn.execute(alias_sql)
aliases = []
for alias in alias_results.fetchall():
aliases.append(alias[0])
info['meta']['aliases'] = aliases

# get the data dictionary for the resource
data_dictionary = datastore_helpers.datastore_dictionary(id)

schema_sql = sa.text(f'''
SELECT
f.attname AS column_name,
pg_catalog.format_type(f.atttypid,f.atttypmod)
AS native_type,
f.attnotnull AS notnull,
i.relname as index_name,
CASE
WHEN i.oid<>0 THEN True
ELSE False
END AS is_index,
CASE
WHEN p.contype = 'u' THEN True
WHEN p.contype = 'p' THEN True
ELSE False
END AS uniquekey
FROM pg_attribute f
JOIN pg_class c ON c.oid = f.attrelid
JOIN pg_type t ON t.oid = f.atttypid
LEFT JOIN pg_constraint p ON p.conrelid = c.oid
AND f.attnum = ANY (p.conkey)
LEFT JOIN pg_index AS ix ON f.attnum = ANY(ix.indkey)
AND c.oid = f.attrelid AND c.oid = ix.indrelid
LEFT JOIN pg_class AS i ON ix.indexrelid = i.oid
WHERE c.relkind = 'r'::char
AND c.relname = {literal_string(id)}
AND f.attnum > 0
ORDER BY c.relname,f.attnum;
''')
with conn.begin():
schema_results = conn.execute(schema_sql)
schemainfo = {}
for row in schema_results.fetchall():
row: Any # Row has incomplete type definition
colname: str = row.column_name
if colname.startswith('_'): # Skip internal rows
continue
colinfo: dict[str, Any] = {'native_type': row.native_type,
'notnull': row.notnull,
'index_name': row.index_name,
'is_index': row.is_index,
'uniquekey': row.uniquekey}
schemainfo[colname] = colinfo

for field in data_dictionary:
field.update({'schema': schemainfo[field['id']]})
info['fields'].append(field)

except Exception:
pass
Expand Down

0 comments on commit 513b0c7

Please sign in to comment.