From cdf2c9d7a30ca8102c3d4ee4e3e633b3b5024b4d Mon Sep 17 00:00:00 2001 From: JaySon Date: Mon, 23 Mar 2020 07:47:01 -0500 Subject: [PATCH] [FLASH-1026] Synchronization between drop table and remove region (#538) --- dbms/src/Interpreters/InterpreterDropQuery.cpp | 4 ++-- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 13 ++++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterDropQuery.cpp b/dbms/src/Interpreters/InterpreterDropQuery.cpp index 0a9c8f1cd60..98e6443dbee 100644 --- a/dbms/src/Interpreters/InterpreterDropQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDropQuery.cpp @@ -123,11 +123,11 @@ BlockIO InterpreterDropQuery::execute() ErrorCodes::TABLE_WAS_NOT_DROPPED); } - table.first->shutdown(); - /// If table was already dropped by anyone, an exception will be thrown auto table_lock = table.first->lockForAlter(__PRETTY_FUNCTION__); + table.first->shutdown(); + String current_table_name = table.first->getTableName(); if (drop.detach) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index c26ec91ea12..8d4cdc41330 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -216,13 +216,15 @@ void DeltaMergeStore::shutdown() { bool v = false; if (!shutdown_called.compare_exchange_strong(v, true)) - return ; + return; + LOG_DEBUG(log, "Shutdown DeltaMerge Store start [" << db_name << "." << table_name << "]"); background_pool.removeTask(gc_handle); gc_handle = nullptr; background_pool.removeTask(background_task_handle); background_task_handle = nullptr; + LOG_DEBUG(log, "Shutdown DeltaMerge Store start [" << db_name << "." << table_name << "]"); } DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings) @@ -811,6 +813,9 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const auto try_add_background_task = [&](const BackgroundTask & task) { if (background_tasks.length() <= std::max(id_to_segment.size() * 2, background_pool.getNumberOfThreads() * 5)) { + if (shutdown_called.load(std::memory_order_relaxed)) + return; + // Prevent too many tasks. background_tasks.addTask(task, thread_type, log); background_task_handle->wake(); @@ -997,15 +1002,13 @@ bool DeltaMergeStore::handleBackgroundTask() left = segmentMergeDelta(*task.dm_context, task.segment, false); type = ThreadType::BG_MergeDelta; break; - case Compact: - { + case Compact: { task.segment->getDelta()->compact(*task.dm_context); left = task.segment; type = ThreadType::BG_Compact; break; } - case Flush: - { + case Flush: { task.segment->getDelta()->flush(*task.dm_context); left = task.segment; type = ThreadType::BG_Flush;