From 5aabb83f5a6319a4df92587ac0e33db6ab31b519 Mon Sep 17 00:00:00 2001 From: Jyong <76649700+JohnJyong@users.noreply.github.com> Date: Thu, 17 Oct 2024 10:40:22 +0800 Subject: [PATCH] update dataset clean rule (#9426) --- api/configs/feature/__init__.py | 9 ++- api/schedule/clean_unused_datasets_task.py | 23 +++--- api/schedule/clean_unused_messages_task.py | 92 ---------------------- 3 files changed, 19 insertions(+), 105 deletions(-) delete mode 100644 api/schedule/clean_unused_messages_task.py diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index a3334d16345e9..c44a51ee4bcb9 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -506,11 +506,16 @@ class DataSetConfig(BaseSettings): Configuration for dataset management """ - CLEAN_DAY_SETTING: PositiveInt = Field( - description="Interval in days for dataset cleanup operations", + PLAN_SANDBOX_CLEAN_DAY_SETTING: PositiveInt = Field( + description="Interval in days for dataset cleanup operations - plan: sandbox", default=30, ) + PLAN_PRO_CLEAN_DAY_SETTING: PositiveInt = Field( + description="Interval in days for dataset cleanup operations - plan: pro and team", + default=7, + ) + DATASET_OPERATOR_ENABLED: bool = Field( description="Enable or disable dataset operator functionality", default=False, diff --git a/api/schedule/clean_unused_datasets_task.py b/api/schedule/clean_unused_datasets_task.py index 826842e52bf82..100fd8dfab67e 100644 --- a/api/schedule/clean_unused_datasets_task.py +++ b/api/schedule/clean_unused_datasets_task.py @@ -17,10 +17,11 @@ @app.celery.task(queue="dataset") def clean_unused_datasets_task(): click.echo(click.style("Start clean unused datasets indexes.", fg="green")) - clean_days = dify_config.CLEAN_DAY_SETTING + plan_sandbox_clean_day_setting = dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING + plan_pro_clean_day_setting = dify_config.PLAN_PRO_CLEAN_DAY_SETTING start_at = time.perf_counter() - thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days) - seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=7) + plan_sandbox_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_sandbox_clean_day_setting) + plan_pro_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_pro_clean_day_setting) page = 1 while True: try: @@ -31,7 +32,7 @@ def clean_unused_datasets_task(): Document.indexing_status == "completed", Document.enabled == True, Document.archived == False, - Document.updated_at > thirty_days_ago, + Document.updated_at > plan_sandbox_clean_day, ) .group_by(Document.dataset_id) .subquery() @@ -44,7 +45,7 @@ def clean_unused_datasets_task(): Document.indexing_status == "completed", Document.enabled == True, Document.archived == False, - Document.updated_at < thirty_days_ago, + Document.updated_at < plan_sandbox_clean_day, ) .group_by(Document.dataset_id) .subquery() @@ -56,7 +57,7 @@ def clean_unused_datasets_task(): .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id) .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id) .filter( - Dataset.created_at < thirty_days_ago, + Dataset.created_at < plan_sandbox_clean_day, func.coalesce(document_subquery_new.c.document_count, 0) == 0, func.coalesce(document_subquery_old.c.document_count, 0) > 0, ) @@ -72,7 +73,7 @@ def clean_unused_datasets_task(): for dataset in datasets: dataset_query = ( db.session.query(DatasetQuery) - .filter(DatasetQuery.created_at > thirty_days_ago, DatasetQuery.dataset_id == dataset.id) + .filter(DatasetQuery.created_at > plan_sandbox_clean_day, DatasetQuery.dataset_id == dataset.id) .all() ) if not dataset_query or len(dataset_query) == 0: @@ -101,7 +102,7 @@ def clean_unused_datasets_task(): Document.indexing_status == "completed", Document.enabled == True, Document.archived == False, - Document.updated_at > seven_days_ago, + Document.updated_at > plan_pro_clean_day, ) .group_by(Document.dataset_id) .subquery() @@ -114,7 +115,7 @@ def clean_unused_datasets_task(): Document.indexing_status == "completed", Document.enabled == True, Document.archived == False, - Document.updated_at < seven_days_ago, + Document.updated_at < plan_pro_clean_day, ) .group_by(Document.dataset_id) .subquery() @@ -126,7 +127,7 @@ def clean_unused_datasets_task(): .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id) .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id) .filter( - Dataset.created_at < seven_days_ago, + Dataset.created_at < plan_pro_clean_day, func.coalesce(document_subquery_new.c.document_count, 0) == 0, func.coalesce(document_subquery_old.c.document_count, 0) > 0, ) @@ -142,7 +143,7 @@ def clean_unused_datasets_task(): for dataset in datasets: dataset_query = ( db.session.query(DatasetQuery) - .filter(DatasetQuery.created_at > seven_days_ago, DatasetQuery.dataset_id == dataset.id) + .filter(DatasetQuery.created_at > plan_pro_clean_day, DatasetQuery.dataset_id == dataset.id) .all() ) if not dataset_query or len(dataset_query) == 0: diff --git a/api/schedule/clean_unused_messages_task.py b/api/schedule/clean_unused_messages_task.py deleted file mode 100644 index 85e6a58a0e90c..0000000000000 --- a/api/schedule/clean_unused_messages_task.py +++ /dev/null @@ -1,92 +0,0 @@ -import datetime -import time - -import click -from sqlalchemy import func -from werkzeug.exceptions import NotFound - -import app -from configs import dify_config -from core.rag.index_processor.index_processor_factory import IndexProcessorFactory -from extensions.ext_database import db -from models.dataset import Dataset, DatasetQuery, Document - - -@app.celery.task(queue="dataset") -def clean_unused_message_task(): - click.echo(click.style("Start clean unused messages .", fg="green")) - clean_days = int(dify_config.CLEAN_DAY_SETTING) - start_at = time.perf_counter() - thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days) - page = 1 - while True: - try: - # Subquery for counting new documents - document_subquery_new = ( - db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) - .filter( - Document.indexing_status == "completed", - Document.enabled == True, - Document.archived == False, - Document.updated_at > thirty_days_ago, - ) - .group_by(Document.dataset_id) - .subquery() - ) - - # Subquery for counting old documents - document_subquery_old = ( - db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) - .filter( - Document.indexing_status == "completed", - Document.enabled == True, - Document.archived == False, - Document.updated_at < thirty_days_ago, - ) - .group_by(Document.dataset_id) - .subquery() - ) - - # Main query with join and filter - datasets = ( - db.session.query(Dataset) - .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id) - .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id) - .filter( - Dataset.created_at < thirty_days_ago, - func.coalesce(document_subquery_new.c.document_count, 0) == 0, - func.coalesce(document_subquery_old.c.document_count, 0) > 0, - ) - .order_by(Dataset.created_at.desc()) - .paginate(page=page, per_page=50) - ) - - except NotFound: - break - if datasets.items is None or len(datasets.items) == 0: - break - page += 1 - for dataset in datasets: - dataset_query = ( - db.session.query(DatasetQuery) - .filter(DatasetQuery.created_at > thirty_days_ago, DatasetQuery.dataset_id == dataset.id) - .all() - ) - if not dataset_query or len(dataset_query) == 0: - try: - # remove index - index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() - index_processor.clean(dataset, None) - - # update document - update_params = {Document.enabled: False} - - Document.query.filter_by(dataset_id=dataset.id).update(update_params) - db.session.commit() - click.echo(click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green")) - except Exception as e: - click.echo( - click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red") - ) - end_at = time.perf_counter() - click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))