Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error handling when reading request and Region meta change in concurrency (#1101) #1109

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ endif ()

find_package (Threads)

if (ENABLE_TESTS)
add_definitions(-DFIU_ENABLE)
endif()

include_directories (src)
add_subdirectory (src)

Expand Down Expand Up @@ -78,6 +82,7 @@ add_headers_only(dbms src/Server)

list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
list (APPEND clickhouse_common_io_headers ${fiu_include_dirs})

list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionHelpers.cpp)
list (APPEND dbms_headers src/Functions/IFunction.h src/Functions/FunctionFactory.h src/Functions/FunctionHelpers.h)
Expand Down Expand Up @@ -157,6 +162,7 @@ target_link_libraries (clickhouse_common_io
${EXECINFO_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${CMAKE_DL_LIBS}
fiu
prometheus-cpp::core
prometheus-cpp::push
prometheus-cpp::pull
Expand Down Expand Up @@ -254,9 +260,6 @@ if (NOT USE_INTERNAL_ZSTD_LIBRARY)
target_include_directories (dbms BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()

#use libfiu
target_include_directories(dbms PUBLIC ${fiu_include_dirs})

target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io PUBLIC ${PCG_RANDOM_INCLUDE_DIR})
Expand All @@ -271,8 +274,6 @@ add_subdirectory (tests)
if (ENABLE_TESTS)
include (${ClickHouse_SOURCE_DIR}/cmake/find_gtest.cmake)

add_definitions(-DFIU_ENABLE)

if (USE_INTERNAL_GTEST_LIBRARY)
# Google Test from sources
add_subdirectory(${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest)
Expand Down
105 changes: 105 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#include <Common/FailPoint.h>

#include <boost/core/noncopyable.hpp>
#include <condition_variable>
#include <mutex>

namespace DB
{
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;

#define APPLY_FOR_FAILPOINTS(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
M(exception_drop_table_during_remove_meta) \
M(exception_between_rename_table_data_and_metadata); \
M(exception_between_create_database_meta_and_directory); \
M(exception_before_rename_table_old_meta_removed); \
M(region_exception_after_read_from_storage_some_error) \
M(region_exception_after_read_from_storage_all_error)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) M(pause_after_learner_read)

namespace FailPoints
{
#define M(NAME) extern const char NAME[] = #NAME "";
APPLY_FOR_FAILPOINTS(M)
APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
#undef M
} // namespace FailPoints

#ifdef FIU_ENABLE
class FailPointChannel : private boost::noncopyable
{
public:
// wake up all waiting threads when destroy
~FailPointChannel() { cv.notify_all(); }

void wait()
{
std::unique_lock lock(m);
cv.wait(lock);
}

private:
std::mutex m;
std::condition_variable cv;
};

void FailPointHelper::enableFailPoint(const String & fail_point_name)
{
#define M(NAME) \
if (fail_point_name == FailPoints::NAME) \
{ \
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, FIU_ONETIME); \
return; \
}

APPLY_FOR_FAILPOINTS(M)
#undef M

#define M(NAME) \
if (fail_point_name == FailPoints::NAME) \
{ \
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, FIU_ONETIME); \
fail_point_wait_channels.try_emplace(FailPoints::NAME, std::make_shared<FailPointChannel>()); \
return; \
}

APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
#undef M
throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
}

void FailPointHelper::disableFailPoint(const String & fail_point_name)
{
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter != fail_point_wait_channels.end())
fail_point_wait_channels.erase(iter);
fiu_disable(fail_point_name.c_str());
}

void FailPointHelper::wait(const String & fail_point_name)
{
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter == fail_point_wait_channels.end())
throw Exception("Can not find channel for fail point" + fail_point_name);
else
{
auto ptr = iter->second;
ptr->wait();
}
}
#else
class FailPointChannel
{
};

void FailPointHelper::enableFailPoint(const String & fail_point_name) {}

void FailPointHelper::disableFailPoint(const String & fail_point_name) {}

void FailPointHelper::wait(const String & fail_point_name) {}
#endif

} // namespace DB
41 changes: 16 additions & 25 deletions dbms/src/Common/FailPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <fiu-local.h>
#include <fiu.h>

#include <unordered_map>

namespace DB
{

Expand All @@ -13,36 +15,25 @@ namespace ErrorCodes
extern const int FAIL_POINT_ERROR;
};

#define FAIL_POINT_REGISTER(name) static constexpr char name[] = #name "";

#define FAIL_POINT_ENABLE(trigger, name) \
else if (trigger == name) { fiu_enable(name, 1, nullptr, FIU_ONETIME); }

FAIL_POINT_REGISTER(exception_between_drop_meta_and_data)
FAIL_POINT_REGISTER(exception_between_alter_data_and_meta)
FAIL_POINT_REGISTER(exception_drop_table_during_remove_meta)
FAIL_POINT_REGISTER(exception_between_rename_table_data_and_metadata);
FAIL_POINT_REGISTER(exception_between_create_database_meta_and_directory);
FAIL_POINT_REGISTER(exception_before_rename_table_old_meta_removed);

/// Macros to set failpoints.
// When `fail_point` is enabled, throw an exception
#define FAIL_POINT_TRIGGER_EXCEPTION(fail_point) \
fiu_do_on(fail_point, throw Exception("Fail point " #fail_point " is triggered.", ErrorCodes::FAIL_POINT_ERROR);)
// When `fail_point` is enabled, wait till it is disabled
#define FAIL_POINT_PAUSE(fail_point) fiu_do_on(fail_point, FailPointHelper::wait(fail_point);)


class FailPointChannel;
class FailPointHelper
{
public:
static void enableFailPoint(const String & fail_point_name)
{
if (false) {}
FAIL_POINT_ENABLE(fail_point_name, exception_between_alter_data_and_meta)
FAIL_POINT_ENABLE(fail_point_name, exception_between_drop_meta_and_data)
FAIL_POINT_ENABLE(fail_point_name, exception_drop_table_during_remove_meta)
FAIL_POINT_ENABLE(fail_point_name, exception_between_rename_table_data_and_metadata)
FAIL_POINT_ENABLE(fail_point_name, exception_between_create_database_meta_and_directory)
FAIL_POINT_ENABLE(fail_point_name, exception_before_rename_table_old_meta_removed)
else throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
}

static void disableFailPoint(const String & fail_point_name) { fiu_disable(fail_point_name.c_str()); }
static void enableFailPoint(const String & fail_point_name);

static void disableFailPoint(const String & fail_point_name);

static void wait(const String & fail_point_name);

private:
static std::unordered_map<String, std::shared_ptr<FailPointChannel>> fail_point_wait_channels;
};
} // namespace DB
10 changes: 8 additions & 2 deletions dbms/src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}

namespace FailPoints
{
extern const char exception_drop_table_during_remove_meta[];
extern const char exception_between_rename_table_data_and_metadata[];
}

static constexpr size_t PRINT_MESSAGE_EACH_N_TABLES = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
Expand Down Expand Up @@ -205,7 +211,7 @@ void DatabaseOrdinary::removeTable(
// If tiflash crash before remove metadata, next time it restart, will
// full apply schema from TiDB. And the old table's metadata and data
// will be removed.
FAIL_POINT_TRIGGER_EXCEPTION(exception_drop_table_during_remove_meta);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_drop_table_during_remove_meta);
Poco::File(table_metadata_path).remove();
}
catch (...)
Expand Down Expand Up @@ -249,7 +255,7 @@ void DatabaseOrdinary::renameTable(
}

// TODO: Atomic rename table is not fixed.
FAIL_POINT_TRIGGER_EXCEPTION(exception_between_rename_table_data_and_metadata);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_rename_table_data_and_metadata);

ASTPtr ast = DatabaseLoading::getQueryFromMetadata(context, detail::getTableMetadataPath(metadata_path, table_name));
if (!ast)
Expand Down
46 changes: 27 additions & 19 deletions dbms/src/Databases/DatabaseTiFlash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int SYNTAX_ERROR;
} // namespace ErrorCodes

namespace FailPoints
{
extern const char exception_drop_table_during_remove_meta[];
extern const char exception_before_rename_table_old_meta_removed[];
}

static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;

DatabaseTiFlash::DatabaseTiFlash(
Expand Down Expand Up @@ -178,8 +184,8 @@ void DatabaseTiFlash::createTable(const Context & context, const String & table_

/// If it was ATTACH query and file with table metadata already exist
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""),
table_metadata_path, EncryptionPath(table_metadata_path, ""));
context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""), table_metadata_path,
EncryptionPath(table_metadata_path, ""));
}
catch (...)
{
Expand All @@ -198,7 +204,7 @@ void DatabaseTiFlash::removeTable(const Context & context, const String & table_
// full apply schema from TiDB. And the old table's metadata and data
// will be removed.
String table_metadata_path = getTableMetadataPath(table_name);
FAIL_POINT_TRIGGER_EXCEPTION(exception_drop_table_during_remove_meta);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_drop_table_during_remove_meta);
context.getFileProvider()->deleteRegularFile(table_metadata_path, EncryptionPath(table_metadata_path, ""));
}
catch (...)
Expand Down Expand Up @@ -273,10 +279,11 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_
// then we cannot rename the encryption info and the file in an atomic operation.
bool use_target_encrypt_info = context.getFileProvider()->isFileEncrypted(EncryptionPath(new_tbl_meta_file, ""));
{
EncryptionPath encryption_path = use_target_encrypt_info ? EncryptionPath(new_tbl_meta_file, "") : EncryptionPath(new_tbl_meta_file_tmp, "");
EncryptionPath encryption_path
= use_target_encrypt_info ? EncryptionPath(new_tbl_meta_file, "") : EncryptionPath(new_tbl_meta_file_tmp, "");
bool create_new_encryption_info = !use_target_encrypt_info && statement.size();
WriteBufferFromFileProvider out(context.getFileProvider(), new_tbl_meta_file_tmp, encryption_path,
create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL);
WriteBufferFromFileProvider out(
context.getFileProvider(), new_tbl_meta_file_tmp, encryption_path, create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
out.next();
if (context.getSettingsRef().fsync_metadata)
Expand All @@ -293,8 +300,8 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_
}
else
{
context.getFileProvider()->renameFile(new_tbl_meta_file_tmp, EncryptionPath(new_tbl_meta_file_tmp, ""),
new_tbl_meta_file, EncryptionPath(new_tbl_meta_file, ""));
context.getFileProvider()->renameFile(new_tbl_meta_file_tmp, EncryptionPath(new_tbl_meta_file_tmp, ""), new_tbl_meta_file,
EncryptionPath(new_tbl_meta_file, ""));
}
}
catch (...)
Expand All @@ -306,15 +313,16 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_
throw;
}

FAIL_POINT_TRIGGER_EXCEPTION(exception_before_rename_table_old_meta_removed);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_rename_table_old_meta_removed);

// If only display name updated, don't remove `old_tbl_meta_file`.
if (!isSamePath(old_tbl_meta_file, new_tbl_meta_file))
{
// If process crash before removing old table meta file, we will continue or rollback this
// rename command next time `loadTables` is called. See `loadTables` and
// `DatabaseLoading::startupTables` for more details.
context.getFileProvider()->deleteRegularFile(old_tbl_meta_file, EncryptionPath(old_tbl_meta_file, ""));// Then remove old meta file
context.getFileProvider()->deleteRegularFile(
old_tbl_meta_file, EncryptionPath(old_tbl_meta_file, "")); // Then remove old meta file
}
}

Expand Down Expand Up @@ -365,10 +373,11 @@ void DatabaseTiFlash::alterTable(
// refer to the comment in `renameTable`
bool use_target_encrypt_info = context.getFileProvider()->isFileEncrypted(EncryptionPath(table_metadata_path, ""));
{
EncryptionPath encryption_path = use_target_encrypt_info ? EncryptionPath(table_metadata_path, "") : EncryptionPath(table_metadata_tmp_path, "");
EncryptionPath encryption_path
= use_target_encrypt_info ? EncryptionPath(table_metadata_path, "") : EncryptionPath(table_metadata_tmp_path, "");
bool create_new_encryption_info = !use_target_encrypt_info && statement.size();
WriteBufferFromFileProvider out(context.getFileProvider(), table_metadata_tmp_path, encryption_path,
create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL);
WriteBufferFromFileProvider out(
context.getFileProvider(), table_metadata_tmp_path, encryption_path, create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
out.next();
if (context.getSettingsRef().fsync_metadata)
Expand All @@ -385,16 +394,15 @@ void DatabaseTiFlash::alterTable(
}
else
{
context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""),
table_metadata_path, EncryptionPath(table_metadata_path, ""));
context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""), table_metadata_path,
EncryptionPath(table_metadata_path, ""));
}
}
catch (...)
{
if (!use_target_encrypt_info)
{
context.getFileProvider()->deleteRegularFile(table_metadata_tmp_path,
EncryptionPath(table_metadata_tmp_path, ""));
context.getFileProvider()->deleteRegularFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""));
}
throw;
}
Expand Down Expand Up @@ -482,8 +490,8 @@ void DatabaseTiFlash::drop(const Context & context)
// Remove meta file for this database
if (auto meta_file = Poco::File(getDatabaseMetadataPath(getMetadataPath())); meta_file.exists())
{
context.getFileProvider()->deleteRegularFile(getDatabaseMetadataPath(getMetadataPath()),
EncryptionPath(getDatabaseMetadataPath(getMetadataPath()), ""));
context.getFileProvider()->deleteRegularFile(
getDatabaseMetadataPath(getMetadataPath()), EncryptionPath(getDatabaseMetadataPath(getMetadataPath()), ""));
}
}

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
} // namespace ErrorCodes

namespace FailPoints
{
extern const char exception_before_rename_table_old_meta_removed[];
}

namespace tests
{

Expand Down Expand Up @@ -469,7 +474,7 @@ try

const String to_tbl_name = "t_112";
// Rename table to another database, and mock crash by failed point
FailPointHelper::enableFailPoint("exception_before_rename_table_old_meta_removed");
FailPointHelper::enableFailPoint(FailPoints::exception_before_rename_table_old_meta_removed);
ASSERT_THROW(
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name), DB::Exception);

Expand Down
Loading