Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise committed Apr 7, 2023
2 parents c25ab08 + b8df139 commit 2eba7b2
Show file tree
Hide file tree
Showing 78 changed files with 1,242 additions and 430 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
url = https://github.com/guanzhi/GmSSL.git
[submodule "contrib/aws"]
path = contrib/aws
url = https://github.com/aws/aws-sdk-cpp.git
url = https://github.com/JaySon-Huang/aws-sdk-cpp.git
[submodule "contrib/aws-c-auth"]
path = contrib/aws-c-auth
url = https://github.com/awslabs/aws-c-auth.git
Expand Down
2 changes: 1 addition & 1 deletion contrib/tiflash-proxy
Submodule tiflash-proxy updated 155 files
1 change: 1 addition & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
M(S3ListObjects) \
M(S3DeleteObject) \
M(S3CopyObject) \
M(S3GetObjectRetry) \
M(FileCacheHit) \
M(FileCacheMiss) \
M(FileCacheEvict)
Expand Down
79 changes: 79 additions & 0 deletions dbms/src/Common/UniThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/Exception.h>
#include <Poco/Event.h>
#include <boost_wrapper/priority_queue.h>

Expand All @@ -23,14 +24,18 @@
#include <cstdint>
#include <ext/scope_guard.h>
#include <functional>
#include <future>
#include <list>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>

namespace DB
{
template <typename Thread>
class ThreadPoolWaitGroup;
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.
Expand Down Expand Up @@ -93,6 +98,11 @@ class ThreadPoolImpl
void setQueueSize(size_t value);
size_t getMaxThreads() const;

std::unique_ptr<ThreadPoolWaitGroup<Thread>> waitGroup()
{
return std::make_unique<ThreadPoolWaitGroup<Thread>>(*this);
}

private:
mutable std::mutex mutex;
std::condition_variable job_finished;
Expand Down Expand Up @@ -284,6 +294,75 @@ class ThreadFromGlobalPoolImpl : boost::noncopyable
}
};

/// ThreadPoolWaitGroup is used to wait all the task launched here to finish
/// To guarantee the exception safty of ThreadPoolWaitGroup, we need to create object, do schedule and wait in the same scope.
template <typename Thread>
class ThreadPoolWaitGroup
{
public:
explicit ThreadPoolWaitGroup(ThreadPoolImpl<Thread> & thread_pool_)
: thread_pool(thread_pool_)
{}
ThreadPoolWaitGroup(const ThreadPoolWaitGroup &) = delete;
~ThreadPoolWaitGroup()
{
try
{
wait();
}
catch (...)
{
tryLogCurrentException(Logger::get(), "Error in destructor function of ThreadPoolWaitGroup");
}
}

void schedule(std::function<void()> func)
{
auto task = std::make_shared<std::packaged_task<void()>>(func);
thread_pool.scheduleOrThrowOnError([task] { (*task)(); });
futures.emplace_back(task->get_future());
}

void wait()
{
if (consumed)
return;
consumed = true;

std::exception_ptr first_exception;
for (auto & future : futures)
{
// ensure all futures finished
try
{
future.get();
}
catch (...)
{
if (!first_exception)
first_exception = std::current_exception();
}
}

if (first_exception)
{
try
{
std::rethrow_exception(first_exception);
}
catch (Exception & exc)
{
exc.addMessage(exc.getStackTrace().toString());
exc.rethrow();
}
}
}

private:
std::vector<std::future<void>> futures;
ThreadPoolImpl<Thread> & thread_pool;
bool consumed = false;
};
/// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context.
///
/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way,
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,18 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
}
catch (...)
{
std::unique_lock lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
{
std::unique_lock lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
}
auto error_message = getCurrentExceptionMessage(false, true);
if (subquery.join)
subquery.join->meetError(error_message);
LOG_ERROR(log, "{} throw exception: {} In {} sec. ", gen_log_msg(), error_message, watch.elapsedSeconds());
/// createOne is concurrently running in multiple threads, call cancel here to stop other threads
/// need to use cancel(true) here because the other threads may be blocked in `ExchangeReceiver::nextResult`,
/// cancel(true) will wake up these threads
cancel(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Block ParallelAggregatingBlockInputStream::readImpl()
*/

aggregator.finishSpill();
LOG_INFO(log, "Begin restore data from disk for aggregation.");
BlockInputStreams input_streams = aggregator.restoreSpilledData();
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(
input_streams,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Databases/DatabaseMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ DatabaseMemory::DatabaseMemory(String name_)

void DatabaseMemory::loadTables(
Context & /*context*/,
legacy::ThreadPool * /*thread_pool*/,
ThreadPool * /*thread_pool*/,
bool /*has_force_restore_data_flag*/)
{
/// Nothing to load.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Databases/DatabaseMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DatabaseMemory : public DatabaseWithOwnTablesBase

void loadTables(
Context & context,
legacy::ThreadPool * thread_pool,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;

void createTable(
Expand Down
51 changes: 40 additions & 11 deletions dbms/src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/FailPoint.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/UniThreadPool.h>
#include <Common/escapeForFileName.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseOrdinary.h>
Expand All @@ -29,11 +30,9 @@
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Poco/DirectoryIterator.h>
#include <common/ThreadPool.h>
#include <common/logger_useful.h>
#include <fmt/core.h>


namespace DB
{
namespace ErrorCodes
Expand All @@ -46,6 +45,7 @@ extern const int FILE_DOESNT_EXIST;
extern const int LOGICAL_ERROR;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int SYNTAX_ERROR;
extern const int TIDB_TABLE_ALREADY_EXISTS;
} // namespace ErrorCodes

namespace FailPoints
Expand Down Expand Up @@ -83,7 +83,7 @@ DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_,
}


void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread_pool, bool has_force_restore_data_flag)
void DatabaseOrdinary::loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag)
{
using FileNames = std::vector<std::string>;
FileNames file_names = DatabaseLoading::listSQLFilenames(metadata_path, log);
Expand All @@ -102,10 +102,15 @@ void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread
AtomicStopwatch watch;
std::atomic<size_t> tables_processed{0};

auto wait_group = thread_pool ? thread_pool->waitGroup() : nullptr;

std::mutex failed_tables_mutex;
Tables tables_failed_to_startup;

auto task_function = [&](FileNames::const_iterator begin, FileNames::const_iterator end) {
for (auto it = begin; it != end; ++it)
{
const String & table = *it;
const String & table_file = *it;

/// Messages, so that it's not boring to wait for the server to load for a long time.
if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
Expand All @@ -114,7 +119,32 @@ void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread
watch.restart();
}

DatabaseLoading::loadTable(context, *this, metadata_path, name, data_path, getEngineName(), table, has_force_restore_data_flag);
auto [table_name, table] = DatabaseLoading::loadTable(context, *this, metadata_path, name, data_path, getEngineName(), table_file, has_force_restore_data_flag);

/// After table was basically initialized, startup it.
if (table)
{
try
{
table->startup();
}
catch (DB::Exception & e)
{
if (e.code() == ErrorCodes::TIDB_TABLE_ALREADY_EXISTS)
{
// While doing IStorage::startup, Exception thorwn with TIDB_TABLE_ALREADY_EXISTS,
// means that we may crashed in the middle of renaming tables. We clean the meta file
// for those storages by `cleanupTables`.
// - If the storage is the outdated one after renaming, remove it is right.
// - If the storage should be the target table, remove it means we "rollback" the
// rename action. And the table will be renamed by TiDBSchemaSyncer later.
std::lock_guard lock(failed_tables_mutex);
tables_failed_to_startup.emplace(table_name, table);
}
else
throw;
}
}
}
};

Expand All @@ -126,21 +156,20 @@ void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread
auto begin = file_names.begin() + i * bunch_size;
auto end = (i + 1 == num_bunches) ? file_names.end() : (file_names.begin() + (i + 1) * bunch_size);

auto task = [task_function, begin, end] {
return task_function(begin, end);
auto task = [&task_function, begin, end] {
task_function(begin, end);
};

if (thread_pool)
thread_pool->schedule(task);
wait_group->schedule(task);
else
task();
}

if (thread_pool)
thread_pool->wait();
wait_group->wait();

/// After all tables was basically initialized, startup them.
DatabaseLoading::startupTables(*this, name, tables, thread_pool, log);
DatabaseLoading::cleanupTables(*this, name, tables_failed_to_startup, log);
}

void DatabaseOrdinary::createTable(const Context & context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Databases/DatabaseOrdinary.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class DatabaseOrdinary : public DatabaseWithOwnTablesBase

void loadTables(
Context & context,
legacy::ThreadPool * thread_pool,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;

void createTable(
Expand Down
Loading

0 comments on commit 2eba7b2

Please sign in to comment.