Skip to content

Commit

Permalink
ref(cleanup): Try to better utilize multiple processes (#6407)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrobenolt authored and tkaemming committed Sep 4, 2018
1 parent 6a56a08 commit 7af0991
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 221 deletions.
135 changes: 112 additions & 23 deletions src/sentry/db/deletion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import absolute_import

import itertools
from uuid import uuid4

from datetime import timedelta
from django.db import connections, router
from django.utils import timezone
Expand Down Expand Up @@ -76,30 +79,11 @@ def _continuous_query(self, query):
results = cursor.rowcount > 0

def execute_generic(self, chunk_size=100):
qs = self.model.objects.all()

if self.days:
cutoff = timezone.now() - timedelta(days=self.days)
qs = qs.filter(**{'{}__lte'.format(self.dtfield): cutoff})
if self.project_id:
if 'project' in self.model._meta.get_all_field_names():
qs = qs.filter(project=self.project_id)
else:
qs = qs.filter(project_id=self.project_id)

qs = self.get_generic_queryset()
return self._continuous_generic_query(qs, chunk_size)

def execute_sharded(self, total_shards, shard_id, chunk_size=100):
assert total_shards > 1
assert shard_id < total_shards
qs = self.model.objects.all().extra(
where=[
'id %% {total_shards} = {shard_id}'.format(
total_shards=total_shards,
shard_id=shard_id,
)
]
)
def get_generic_queryset(self):
qs = self.model.objects.all()

if self.days:
cutoff = timezone.now() - timedelta(days=self.days)
Expand All @@ -110,7 +94,7 @@ def execute_sharded(self, total_shards, shard_id, chunk_size=100):
else:
qs = qs.filter(project_id=self.project_id)

return self._continuous_generic_query(qs, chunk_size)
return qs

def _continuous_generic_query(self, query, chunk_size):
# XXX: we step through because the deletion collector will pull all
Expand All @@ -127,3 +111,108 @@ def execute(self, chunk_size=10000):
self.execute_postgres(chunk_size)
else:
self.execute_generic(chunk_size)

def iterator(self, chunk_size=100):
if db.is_postgres():
g = self.iterator_postgres(chunk_size)
else:
g = self.iterator_generic(chunk_size)

for chunk in g:
yield chunk

def iterator_postgres(self, chunk_size, batch_size=1000000):
assert self.days is not None
assert self.dtfield is not None and self.dtfield == self.order_by

dbc = connections[self.using]
quote_name = dbc.ops.quote_name

position = None
cutoff = timezone.now() - timedelta(days=self.days)

with dbc.get_new_connection(dbc.get_connection_params()) as conn:
chunk = []

completed = False
while not completed:
# We explicitly use a named cursor here so that we can read a
# large quantity of rows from postgres incrementally, without
# having to pull all rows into memory at once.
with conn.cursor(uuid4().hex) as cursor:
where = [(
"{} < %s".format(quote_name(self.dtfield)),
[cutoff],
)]

if self.project_id:
where.append((
"project_id = %s",
[self.project_id],
))

if self.order_by[0] == '-':
direction = 'desc'
order_field = self.order_by[1:]
if position is not None:
where.append((
'{} <= %s'.format(quote_name(order_field)),
[position],
))
else:
direction = 'asc'
order_field = self.order_by
if position is not None:
where.append((
'{} >= %s'.format(quote_name(order_field)),
[position],
))

conditions, parameters = zip(*where)
parameters = list(itertools.chain.from_iterable(parameters))

query = """
select id, {order_field}
from {table}
where {conditions}
order by {order_field} {direction}
limit {batch_size}
""".format(
table=self.model._meta.db_table,
conditions=' and '.join(conditions),
order_field=quote_name(order_field),
direction=direction,
batch_size=batch_size,
)

cursor.execute(query, parameters)

i = 0
for i, row in enumerate(cursor, 1):
key, position = row
chunk.append(key)
if len(chunk) == chunk_size:
yield tuple(chunk)
chunk = []

# If we retrieved less rows than the batch size, there are
# no more rows remaining to delete and we can exit the
# loop.
if i < batch_size:
completed = True

if chunk:
yield tuple(chunk)

def iterator_generic(self, chunk_size):
from sentry.utils.query import RangeQuerySetWrapper
qs = self.get_generic_queryset()

chunk = []
for item in RangeQuerySetWrapper(qs):
chunk.append(item.id)
if len(chunk) == chunk_size:
yield tuple(chunk)
chunk = []
if chunk:
yield tuple(chunk)
1 change: 0 additions & 1 deletion src/sentry/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def cli(ctx, config):
lambda cmd: cli.add_command(import_string(cmd)), (
'sentry.runner.commands.backup.export', 'sentry.runner.commands.backup.import_',
'sentry.runner.commands.cleanup.cleanup', 'sentry.runner.commands.config.config',
'sentry.runner.commands.cleanup.cleanup_chunk', 'sentry.runner.commands.config.config',
'sentry.runner.commands.createuser.createuser',
'sentry.runner.commands.devserver.devserver', 'sentry.runner.commands.django.django',
'sentry.runner.commands.exec.exec_', 'sentry.runner.commands.files.files',
Expand Down
Loading

0 comments on commit 7af0991

Please sign in to comment.