Skip to content

Commit

Permalink
Port storage postgresql file to format.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rémy HUBSCHER committed Feb 16, 2017
1 parent 7e492d7 commit 58110d9
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 63 deletions.
123 changes: 61 additions & 62 deletions kinto/core/storage/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,18 @@ def initialize_schema(self, dry_run=False):
if not version:
filepath = os.path.join(here, 'schema.sql')
logger.info("Create PostgreSQL storage schema at version "
"%s from %s" % (self.schema_version, filepath))
"{} from {}".format(self.schema_version, filepath))
# Create full schema.
self._check_database_encoding()
self._check_database_timezone()
# Create full schema.
if not dry_run:
self._execute_sql_file(filepath)
logger.info('Created PostgreSQL storage schema '
'(version %s).' % self.schema_version)
logger.info('Created PostgreSQL storage schema (version {}).'.format(
self.schema_version))
return

logger.info('Detected PostgreSQL storage schema version %s.' % version)
logger.info('Detected PostgreSQL storage schema version {}.'.format(version))
migrations = [(v, v + 1) for v in range(version, self.schema_version)]
if not migrations:
logger.info('PostgreSQL storage schema is up-to-date.')
Expand All @@ -112,16 +112,15 @@ def initialize_schema(self, dry_run=False):
# Check order of migrations.
expected = migration[0]
current = self._get_installed_version()
error_msg = "Expected version %s. Found version %s."
error_msg = "Expected version {}. Found version {}."
if not dry_run and expected != current:
raise AssertionError(error_msg % (expected, current))
raise AssertionError(error_msg.format(expected, current))

logger.info('Migrate PostgreSQL storage schema from'
' version %s to %s.' % migration)
filename = 'migration_%03d_%03d.sql' % migration
' version {} to {}.'.format(*migration))
filename = 'migration_{0:03d}_{0:03d}.sql'.format(*migration)
filepath = os.path.join(here, 'migrations', filename)
logger.info("Execute PostgreSQL storage migration"
" from %s" % filepath)
logger.info("Execute PostgreSQL storage migration from {}".format(filepath))
if not dry_run:
self._execute_sql_file(filepath)
logger.info("PostgreSQL storage schema migration {}".format(
Expand All @@ -135,7 +134,7 @@ def _check_database_timezone(self):
record = result.fetchone()
timezone = record['timezone'].upper()
if timezone != 'UTC': # pragma: no cover
msg = 'Database timezone is not UTC (%s)' % timezone
msg = 'Database timezone is not UTC ({})'.format(timezone)
warnings.warn(msg)
logger.warning(msg)

Expand All @@ -151,7 +150,7 @@ def _check_database_encoding(self):
record = result.fetchone()
encoding = record['encoding'].lower()
if encoding != 'utf8': # pragma: no cover
raise AssertionError('Unexpected database encoding %s' % encoding)
raise AssertionError('Unexpected database encoding {}'.format(encoding))

def _get_installed_version(self):
"""Return current version of schema or None if not any found.
Expand Down Expand Up @@ -180,8 +179,8 @@ def _get_installed_version(self):
result = conn.execute(query)
was_flushed = int(result.fetchone()[0]) == 0
if was_flushed:
error_msg = 'Missing schema history: consider version %s.'
logger.warning(error_msg % self.schema_version)
error_msg = 'Missing schema history: consider version {}.'
logger.warning(error_msg.format(self.schema_version))
return self.schema_version

# In the first versions of Cliquet, there was no migration.
Expand Down Expand Up @@ -401,12 +400,12 @@ def delete_all(self, collection_id, parent_id, filters=None,
FROM records
WHERE id IN (SELECT id
FROM records
WHERE %(parent_id_filter)s
%(collection_id_filter)s
%(conditions_filter)s
%(pagination_rules)s
%(sorting)s
%(pagination_limit)s)
WHERE {parent_id_filter}
{collection_id_filter}
{conditions_filter}
{pagination_rules}
{sorting}
{pagination_limit})
RETURNING id, parent_id, collection_id
)
INSERT INTO deleted (id, parent_id, collection_id)
Expand All @@ -420,12 +419,12 @@ def delete_all(self, collection_id, parent_id, filters=None,
FROM records
WHERE id IN (SELECT id
FROM records
WHERE %(parent_id_filter)s
%(collection_id_filter)s
%(conditions_filter)s
%(pagination_rules)s
%(sorting)s
%(pagination_limit)s)
WHERE {parent_id_filter}
{collection_id_filter}
{conditions_filter}
{pagination_rules}
{sorting}
{pagination_limit})
RETURNING id, as_epoch(last_modified) AS last_modified;
"""

Expand All @@ -451,7 +450,7 @@ def delete_all(self, collection_id, parent_id, filters=None,
safe_sql, holders = self._format_conditions(filters,
id_field,
modified_field)
safeholders['conditions_filter'] = 'AND %s' % safe_sql
safeholders['conditions_filter'] = 'AND {}'.format(safe_sql)
placeholders.update(**holders)

if sorting:
Expand All @@ -463,15 +462,15 @@ def delete_all(self, collection_id, parent_id, filters=None,
if pagination_rules:
sql, holders = self._format_pagination(pagination_rules, id_field,
modified_field)
safeholders['pagination_rules'] = 'AND %s' % sql
safeholders['pagination_rules'] = 'AND {}'.format(sql)
placeholders.update(**holders)

if limit:
# We validate the limit value in the resource class as integer.
safeholders['pagination_limit'] = 'LIMIT %s' % limit
safeholders['pagination_limit'] = 'LIMIT {}'.format(limit)

with self.client.connect() as conn:
result = conn.execute(query % safeholders, placeholders)
result = conn.execute(query.format(**safeholders), placeholders)
deleted = result.fetchmany(self._max_fetch_size)

records = []
Expand All @@ -491,9 +490,9 @@ def purge_deleted(self, collection_id, parent_id, before=None,
query = """
DELETE
FROM deleted
WHERE %(parent_id_filter)s
%(collection_id_filter)s
%(conditions_filter)s;
WHERE {parent_id_filter}
{collection_id_filter}
{conditions_filter};
"""
id_field = id_field or self.id_field
modified_field = modified_field or self.modified_field
Expand All @@ -519,7 +518,7 @@ def purge_deleted(self, collection_id, parent_id, before=None,
placeholders['before'] = before

with self.client.connect() as conn:
result = conn.execute(query % safeholders, placeholders)
result = conn.execute(query.format(**safeholders), placeholders)

return result.rowcount

Expand All @@ -533,28 +532,28 @@ def get_all(self, collection_id, parent_id, filters=None, sorting=None,
WITH total_filtered AS (
SELECT COUNT(id) AS count
FROM records
WHERE %(parent_id_filter)s
WHERE {parent_id_filter}
AND collection_id = :collection_id
%(conditions_filter)s
{conditions_filter}
),
collection_filtered AS (
SELECT id, last_modified, data
FROM records
WHERE %(parent_id_filter)s
WHERE {parent_id_filter}
AND collection_id = :collection_id
%(conditions_filter)s
LIMIT %(max_fetch_size)s
{conditions_filter}
LIMIT {max_fetch_size}
),
fake_deleted AS (
SELECT (:deleted_field)::JSONB AS data
),
filtered_deleted AS (
SELECT id, last_modified, fake_deleted.data AS data
FROM deleted, fake_deleted
WHERE %(parent_id_filter)s
WHERE {parent_id_filter}
AND collection_id = :collection_id
%(conditions_filter)s
%(deleted_limit)s
{conditions_filter}
{deleted_limit}
),
all_records AS (
SELECT * FROM filtered_deleted
Expand All @@ -564,14 +563,14 @@ def get_all(self, collection_id, parent_id, filters=None, sorting=None,
paginated_records AS (
SELECT DISTINCT id
FROM all_records
%(pagination_rules)s
{pagination_rules}
)
SELECT total_filtered.count AS count_total,
a.id, as_epoch(a.last_modified) AS last_modified, a.data
FROM paginated_records AS p JOIN all_records AS a ON (a.id = p.id),
total_filtered
%(sorting)s
%(pagination_limit)s;
{sorting}
{pagination_limit};
"""
deleted_field = json.dumps(dict([(deleted_field, True)]))

Expand All @@ -595,7 +594,7 @@ def get_all(self, collection_id, parent_id, filters=None, sorting=None,
safe_sql, holders = self._format_conditions(filters,
id_field,
modified_field)
safeholders['conditions_filter'] = 'AND %s' % safe_sql
safeholders['conditions_filter'] = 'AND {}'.format(safe_sql)
placeholders.update(**holders)

if not include_deleted:
Expand All @@ -610,15 +609,15 @@ def get_all(self, collection_id, parent_id, filters=None, sorting=None,
if pagination_rules:
sql, holders = self._format_pagination(pagination_rules, id_field,
modified_field)
safeholders['pagination_rules'] = 'WHERE %s' % sql
safeholders['pagination_rules'] = 'WHERE {}'.format(sql)
placeholders.update(**holders)

if limit:
# We validate the limit value in the resource class as integer.
safeholders['pagination_limit'] = 'LIMIT %s' % limit
safeholders['pagination_limit'] = 'LIMIT {}'.format(limit)

with self.client.connect(readonly=True) as conn:
result = conn.execute(query % safeholders, placeholders)
result = conn.execute(query.format(**safeholders), placeholders)
retrieved = result.fetchmany(self._max_fetch_size)

if not len(retrieved):
Expand Down Expand Up @@ -675,18 +674,18 @@ def _format_conditions(self, filters, id_field, modified_field,
subfields = filtr.field.split('.')
for j, subfield in enumerate(subfields):
# Safely escape field name
field_holder = '%s_field_%s_%s' % (prefix, i, j)
field_holder = '{}_field_{}_{}'.format(prefix, i, j)
holders[field_holder] = subfield
# Use ->> to convert the last level to text.
column_name += "->>" if j == len(subfields) - 1 else "->"
column_name += ":%s" % field_holder
column_name += ":{}".format(field_holder)

# If field is missing, we default to ''.
sql_field = "coalesce(%s, '')" % column_name
sql_field = "coalesce({}, '')".format(column_name)
# Cast when comparing to number (eg. '4' < '12')
if isinstance(value, (int, float)) and \
value not in (True, False):
sql_field = "(%s)::numeric" % column_name
sql_field = "({})::numeric".format(column_name)

if filtr.operator not in (COMPARISON.IN, COMPARISON.EXCLUDE):
# For the IN operator, let psycopg escape the values list.
Expand All @@ -700,15 +699,15 @@ def _format_conditions(self, filters, id_field, modified_field,
value = (None,)

if filtr.operator == COMPARISON.LIKE:
value = '%{0}%'.format(value)
value = '%{}%'.format(value)

# Safely escape value
value_holder = '%s_value_%s' % (prefix, i)
value_holder = '{}_value_{}'.format(prefix, i)
holders[value_holder] = value

sql_operator = operators.setdefault(filtr.operator,
filtr.operator.value)
cond = "%s %s :%s" % (sql_field, sql_operator, value_holder)
cond = "{} {} :{}".format(sql_field, sql_operator, value_holder)
conditions.append(cond)

safe_sql = ' AND '.join(conditions)
Expand All @@ -734,15 +733,15 @@ def _format_pagination(self, pagination_rules, id_field, modified_field):
placeholders = {}

for i, rule in enumerate(pagination_rules):
prefix = 'rules_%s' % i
prefix = 'rules_{}'.format(i)
safe_sql, holders = self._format_conditions(rule,
id_field,
modified_field,
prefix=prefix)
rules.append(safe_sql)
placeholders.update(**holders)

safe_sql = ' OR '.join(['(%s)' % r for r in rules])
safe_sql = ' OR '.join(['({})'.format(r) for r in rules])
return safe_sql, placeholders

def _format_sorting(self, sorting, id_field, modified_field):
Expand Down Expand Up @@ -770,16 +769,16 @@ def _format_sorting(self, sorting, id_field, modified_field):
sql_field = 'data'
for j, subfield in enumerate(subfields):
# Safely escape field name
field_holder = 'sort_field_%s_%s' % (i, j)
field_holder = 'sort_field_{}_{}'.format(i, j)
holders[field_holder] = subfield
# Use ->> to convert the last level to text.
sql_field += '->(:%s)' % field_holder
sql_field += '->(:{})'.format(field_holder)

sql_direction = 'ASC' if sort.direction > 0 else 'DESC'
sql_sort = "%s %s" % (sql_field, sql_direction)
sql_sort = "{} {}".format(sql_field, sql_direction)
sorts.append(sql_sort)

safe_sql = 'ORDER BY %s' % (', '.join(sorts))
safe_sql = 'ORDER BY {}'.format(', '.join(sorts))
return safe_sql, holders


Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_clean_expired_expires_items(self):
def test_add_over_quota_clean_oversized_items(self):
for x in range(100):
# Each entry is 70 bytes
self.cache.set('foo{}'.format(str(x).zfill(3)), 'toto')
self.cache.set('foo{0:03d}'.format(x), 'toto')
time.sleep(0.001)
assert self.cache.get('foo000') == 'toto'
# This should delete the 2 first entries
Expand Down

0 comments on commit 58110d9

Please sign in to comment.