diff --git a/src/sentry/db/deletion.py b/src/sentry/db/deletion.py index 2d332fab8579b9..38e82a96457cfd 100644 --- a/src/sentry/db/deletion.py +++ b/src/sentry/db/deletion.py @@ -1,5 +1,8 @@ from __future__ import absolute_import +import itertools +from uuid import uuid4 + from datetime import timedelta from django.db import connections, router from django.utils import timezone @@ -76,30 +79,11 @@ def _continuous_query(self, query): results = cursor.rowcount > 0 def execute_generic(self, chunk_size=100): - qs = self.model.objects.all() - - if self.days: - cutoff = timezone.now() - timedelta(days=self.days) - qs = qs.filter(**{'{}__lte'.format(self.dtfield): cutoff}) - if self.project_id: - if 'project' in self.model._meta.get_all_field_names(): - qs = qs.filter(project=self.project_id) - else: - qs = qs.filter(project_id=self.project_id) - + qs = self.get_generic_queryset() return self._continuous_generic_query(qs, chunk_size) - def execute_sharded(self, total_shards, shard_id, chunk_size=100): - assert total_shards > 1 - assert shard_id < total_shards - qs = self.model.objects.all().extra( - where=[ - 'id %% {total_shards} = {shard_id}'.format( - total_shards=total_shards, - shard_id=shard_id, - ) - ] - ) + def get_generic_queryset(self): + qs = self.model.objects.all() if self.days: cutoff = timezone.now() - timedelta(days=self.days) @@ -110,7 +94,7 @@ def execute_sharded(self, total_shards, shard_id, chunk_size=100): else: qs = qs.filter(project_id=self.project_id) - return self._continuous_generic_query(qs, chunk_size) + return qs def _continuous_generic_query(self, query, chunk_size): # XXX: we step through because the deletion collector will pull all @@ -127,3 +111,108 @@ def execute(self, chunk_size=10000): self.execute_postgres(chunk_size) else: self.execute_generic(chunk_size) + + def iterator(self, chunk_size=100): + if db.is_postgres(): + g = self.iterator_postgres(chunk_size) + else: + g = self.iterator_generic(chunk_size) + + for chunk in g: + yield chunk + + def iterator_postgres(self, chunk_size, batch_size=1000000): + assert self.days is not None + assert self.dtfield is not None and self.dtfield == self.order_by + + dbc = connections[self.using] + quote_name = dbc.ops.quote_name + + position = None + cutoff = timezone.now() - timedelta(days=self.days) + + with dbc.get_new_connection(dbc.get_connection_params()) as conn: + chunk = [] + + completed = False + while not completed: + # We explicitly use a named cursor here so that we can read a + # large quantity of rows from postgres incrementally, without + # having to pull all rows into memory at once. + with conn.cursor(uuid4().hex) as cursor: + where = [( + "{} < %s".format(quote_name(self.dtfield)), + [cutoff], + )] + + if self.project_id: + where.append(( + "project_id = %s", + [self.project_id], + )) + + if self.order_by[0] == '-': + direction = 'desc' + order_field = self.order_by[1:] + if position is not None: + where.append(( + '{} <= %s'.format(quote_name(order_field)), + [position], + )) + else: + direction = 'asc' + order_field = self.order_by + if position is not None: + where.append(( + '{} >= %s'.format(quote_name(order_field)), + [position], + )) + + conditions, parameters = zip(*where) + parameters = list(itertools.chain.from_iterable(parameters)) + + query = """ + select id, {order_field} + from {table} + where {conditions} + order by {order_field} {direction} + limit {batch_size} + """.format( + table=self.model._meta.db_table, + conditions=' and '.join(conditions), + order_field=quote_name(order_field), + direction=direction, + batch_size=batch_size, + ) + + cursor.execute(query, parameters) + + i = 0 + for i, row in enumerate(cursor, 1): + key, position = row + chunk.append(key) + if len(chunk) == chunk_size: + yield tuple(chunk) + chunk = [] + + # If we retrieved less rows than the batch size, there are + # no more rows remaining to delete and we can exit the + # loop. + if i < batch_size: + completed = True + + if chunk: + yield tuple(chunk) + + def iterator_generic(self, chunk_size): + from sentry.utils.query import RangeQuerySetWrapper + qs = self.get_generic_queryset() + + chunk = [] + for item in RangeQuerySetWrapper(qs): + chunk.append(item.id) + if len(chunk) == chunk_size: + yield tuple(chunk) + chunk = [] + if chunk: + yield tuple(chunk) diff --git a/src/sentry/runner/__init__.py b/src/sentry/runner/__init__.py index 0eab3cfc92abe9..55c4cc3f25f865 100644 --- a/src/sentry/runner/__init__.py +++ b/src/sentry/runner/__init__.py @@ -56,7 +56,6 @@ def cli(ctx, config): lambda cmd: cli.add_command(import_string(cmd)), ( 'sentry.runner.commands.backup.export', 'sentry.runner.commands.backup.import_', 'sentry.runner.commands.cleanup.cleanup', 'sentry.runner.commands.config.config', - 'sentry.runner.commands.cleanup.cleanup_chunk', 'sentry.runner.commands.config.config', 'sentry.runner.commands.createuser.createuser', 'sentry.runner.commands.devserver.devserver', 'sentry.runner.commands.django.django', 'sentry.runner.commands.exec.exec_', 'sentry.runner.commands.files.files', diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index e87b8b29059b2b..add45a6299d834 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -7,14 +7,13 @@ """ from __future__ import absolute_import, print_function -import six from datetime import timedelta from uuid import uuid4 import click from django.utils import timezone -from sentry.runner.decorators import configuration, log_options +from sentry.runner.decorators import log_options from six.moves import xrange @@ -40,92 +39,70 @@ def get_project(value): return None -def chunker(seq, size): - return (seq[pos:pos + size] for pos in xrange(0, len(seq), size)) +# We need a unique value to indicate when to stop multiprocessing queue +# an identity on an object() isn't guaranteed to work between parent +# and child proc +_STOP_WORKER = '91650ec271ae4b3e8a67cdc909d80f8c' -@click.command() -@click.option('--days', type=click.INT, required=True) -@click.option('--project_id', type=click.INT, required=False) -@click.option('--model', required=True) -@click.option('--dtfield', required=True) -@click.option('--order_by', required=True) -@click.option('--num_shards', required=True) -@click.option('--shard_ids', required=True) -@configuration -def cleanup_chunk(days, project_id, model, dtfield, order_by, num_shards, shard_ids): - import pickle - from threading import Thread - - model = pickle.loads(model) - shard_ids = [int(s) for s in shard_ids.split(",")] - - task = create_deletion_task( - days, project_id, model, dtfield, order_by) - - click.echo("days: %s, project_id: %s, model: %s, dtfield: %s, order_by: %s, shard_ids:%s" % - (days, project_id, model, dtfield, order_by, shard_ids)) - - threads = [] - for shard_id in shard_ids: - t = Thread( - target=( - lambda shard_id=shard_id: _chunk_until_complete( - task, num_shards=num_shards, shard_id=shard_id) - ) - ) - t.start() - threads.append(t) +def multiprocess_worker(task_queue): + # Configure within each Process + import logging + from sentry.utils.imports import import_string - for t in threads: - t.join() + logger = logging.getLogger('sentry.cleanup') + configured = False -def create_deletion_task(days, project_id, model, dtfield, order_by): - from sentry import models - from sentry import deletions - from sentry import similarity + while True: + j = task_queue.get() + if j == _STOP_WORKER: + task_queue.task_done() + return - query = { - '{}__lte'.format(dtfield): (timezone.now() - timedelta(days=days)), - } + # On first task, configure Sentry environment + if not configured: + from sentry.runner import configure + configure() - if project_id: - if 'project' in model._meta.get_all_field_names(): - query['project'] = project_id - else: - query['project_id'] = project_id - - skip_models = [ - # Handled by other parts of cleanup - models.Event, - models.EventMapping, - models.EventAttachment, - models.UserReport, - models.Group, - models.GroupEmailThread, - models.GroupRuleStatus, - models.GroupHashTombstone, - # Handled by TTL - similarity.features, - ] + [b[0] for b in EXTRA_BULK_QUERY_DELETES] - - task = deletions.get( - model=model, - query=query, - order_by=order_by, - skip_models=skip_models, - transaction_id=uuid4().hex, - ) + from sentry import models + from sentry import deletions + from sentry import similarity + + skip_models = [ + # Handled by other parts of cleanup + models.Event, + models.EventMapping, + models.EventAttachment, + models.UserReport, + models.Group, + models.GroupEmailThread, + models.GroupRuleStatus, + models.GroupHashTombstone, + # Handled by TTL + similarity.features, + ] + [b[0] for b in EXTRA_BULK_QUERY_DELETES] + + configured = True - return task + model, chunk = j + model = import_string(model) + try: + task = deletions.get( + model=model, + query={'id__in': chunk}, + skip_models=skip_models, + transaction_id=uuid4().hex, + ) -def _chunk_until_complete(task, num_shards=None, shard_id=None): - has_more = True - while has_more: - has_more = task.chunk( - num_shards=num_shards, shard_id=shard_id) + while True: + if not task.chunk(): + break + except Exception as e: + logger.exception(e) + finally: + task_queue.task_done() @click.command() @@ -136,14 +113,7 @@ def _chunk_until_complete(task, num_shards=None, shard_id=None): type=int, default=1, show_default=True, - help='The total number of concurrent threads to run across processes.' -) -@click.option( - '--max_procs', - type=int, - default=8, - show_default=True, - help='The maximum number of processes to fork off for concurrency.' + help='The total number of concurrent worker processes to run.' ) @click.option( '--silent', '-q', default=False, is_flag=True, help='Run quietly. No output on success.' @@ -158,8 +128,7 @@ def _chunk_until_complete(task, num_shards=None, shard_id=None): help='Send the duration of this command to internal metrics.' ) @log_options() -@configuration -def cleanup(days, project, concurrency, max_procs, silent, model, router, timed): +def cleanup(days, project, concurrency, silent, model, router, timed): """Delete a portion of trailing data based on creation date. All data that is older than `--days` will be deleted. The default for @@ -172,11 +141,21 @@ def cleanup(days, project, concurrency, max_procs, silent, model, router, timed) click.echo('Error: Minimum concurrency is 1', err=True) raise click.Abort() - import math - import multiprocessing - import pickle - import subprocess - import sys + # Make sure we fork off multiprocessing pool + # before we import or configure the app + from multiprocessing import Process, JoinableQueue as Queue + + pool = [] + task_queue = Queue(1000) + for _ in xrange(concurrency): + p = Process(target=multiprocess_worker, args=(task_queue,)) + p.daemon = True + p.start() + pool.append(p) + + from sentry.runner import configure + configure() + from django.db import router as db_router from sentry.app import nodestore from sentry.db.deletion import BulkDeleteQuery @@ -296,43 +275,20 @@ def is_filtered(model): if not silent: click.echo('>> Skipping %s' % model.__name__) else: - if concurrency > 1: - shard_ids = range(concurrency) - num_procs = min(multiprocessing.cpu_count(), max_procs) - threads_per_proc = int(math.ceil( - concurrency / float(num_procs))) - - pids = [] - for shard_id_chunk in chunker(shard_ids, threads_per_proc): - pid = subprocess.Popen([ - sys.argv[0], - 'cleanup_chunk', - '--days', six.binary_type(days), - ] + (['--project_id', six.binary_type(project_id)] if project_id else []) + [ - '--model', pickle.dumps(model), - '--dtfield', dtfield, - '--order_by', order_by, - '--num_shards', six.binary_type(concurrency), - '--shard_ids', ",".join([six.binary_type(s) - for s in shard_id_chunk]), - ]) - pids.append(pid) - - total_pid_count = len(pids) - click.echo( - "%s concurrent processes forked, waiting on them to complete." % total_pid_count) - - complete = 0 - for pid in pids: - pid.wait() - complete += 1 - click.echo( - "%s/%s concurrent processes are finished." % (complete, total_pid_count)) - - else: - task = create_deletion_task( - days, project_id, model, dtfield, order_by) - _chunk_until_complete(task) + imp = '.'.join((model.__module__, model.__name__)) + + q = BulkDeleteQuery( + model=model, + dtfield=dtfield, + days=days, + project_id=project_id, + order_by=order_by, + ) + + for chunk in q.iterator(chunk_size=100): + task_queue.put((imp, chunk)) + + task_queue.join() # Clean up FileBlob instances which are no longer used and aren't super # recent (as there could be a race between blob creation and reference) @@ -344,6 +300,14 @@ def is_filtered(model): else: cleanup_unused_files(silent) + # Shut down our pool + for _ in pool: + task_queue.put(_STOP_WORKER) + + # And wait for it to drain + for p in pool: + p.join() + if timed: duration = int(time.time() - start_time) metrics.timing('cleanup.duration', duration, instance=router) diff --git a/tests/sentry/db/test_deletion.py b/tests/sentry/db/test_deletion.py index ac2a61b0b389cb..26ba218ab70d4b 100644 --- a/tests/sentry/db/test_deletion.py +++ b/tests/sentry/db/test_deletion.py @@ -5,7 +5,7 @@ from sentry.db.deletion import BulkDeleteQuery from sentry.models import Group, Project -from sentry.testutils import TestCase +from sentry.testutils import TestCase, TransactionTestCase class BulkDeleteQueryTest(TestCase): @@ -41,3 +41,27 @@ def test_datetime_restriction(self): assert not Group.objects.filter(id=group1_1.id).exists() assert not Group.objects.filter(id=group1_2.id).exists() assert Group.objects.filter(id=group1_3.id).exists() + + +class BulkDeleteQueryIteratorTestCase(TransactionTestCase): + def test_iteration(self): + target_project = self.project + expected_group_ids = set([self.create_group().id for i in xrange(2)]) + + other_project = self.create_project() + self.create_group(other_project) + self.create_group(other_project) + + iterator = BulkDeleteQuery( + model=Group, + project_id=target_project.id, + dtfield='last_seen', + order_by='last_seen', + days=0, + ).iterator(1) + + results = set() + for chunk in iterator: + results.update(chunk) + + assert results == expected_group_ids diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py deleted file mode 100644 index 19638f93d25390..00000000000000 --- a/tests/sentry/runner/commands/test_cleanup.py +++ /dev/null @@ -1,66 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import absolute_import - -import pytest - -from django.conf import settings - -from sentry.models import Event, Group -from sentry.tagstore.models import GroupTagKey, GroupTagValue, TagValue -from sentry.runner.commands.cleanup import cleanup -from sentry.testutils import CliTestCase - -ALL_MODELS = (Event, Group, GroupTagKey, GroupTagValue, TagValue) - - -class SentryCleanupTest(CliTestCase): - fixtures = ['tests/fixtures/cleanup.json'] - - if settings.SENTRY_TAGSTORE.startswith('sentry.tagstore.legacy.LegacyTagStorage'): - fixtures += ['tests/fixtures/cleanup-tagstore-legacy.json'] - elif settings.SENTRY_TAGSTORE.startswith('sentry.tagstore.v2'): - fixtures += ['tests/fixtures/cleanup-tagstore-v2.json'] - elif settings.SENTRY_TAGSTORE.startswith('sentry.tagstore.multi'): - fixtures += ['tests/fixtures/cleanup-tagstore-legacy.json', - 'tests/fixtures/cleanup-tagstore-v2.json'] - elif settings.SENTRY_TAGSTORE.startswith('sentry.tagstore.snuba'): - pass - else: - raise NotImplementedError - - command = cleanup - - @pytest.mark.skipif( - settings.SENTRY_TAGSTORE == 'sentry.tagstore.v2.V2TagStorage', - reason='Cleanup is temporarily disabled for tagstore v2' - ) - def test_simple(self): - rv = self.invoke('--days=1') - assert rv.exit_code == 0, rv.output - - for model in ALL_MODELS: - assert model.objects.count() == 0 - - @pytest.mark.skipif( - settings.SENTRY_TAGSTORE == 'sentry.tagstore.v2.V2TagStorage', - reason='Cleanup is temporarily disabled for tagstore v2' - ) - def test_project(self): - orig_counts = {} - for model in ALL_MODELS: - count = model.objects.count() - assert count > 0 - orig_counts[model] = count - - rv = self.invoke('--days=1', '--project=2') - assert rv.exit_code == 0, rv.output - - for model in ALL_MODELS: - assert model.objects.count() == orig_counts[model] - - rv = self.invoke('--days=1', '--project=1') - assert rv.exit_code == 0, rv.output - - for model in ALL_MODELS: - assert model.objects.count() == 0