Skip to content

Commit

Permalink
Improve error handling when reading request and Region meta change in…
Browse files Browse the repository at this point in the history
… concurrency (#1101)

* Improve error handling when reading request and Region meta change in concurrency
* Add retry from local storage
* Move definitions of FailPoints into cpp file

Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang authored Sep 21, 2020
1 parent e68bb98 commit b6a151a
Show file tree
Hide file tree
Showing 16 changed files with 416 additions and 97 deletions.
11 changes: 6 additions & 5 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ endif ()

find_package (Threads)

if (ENABLE_TESTS)
add_definitions(-DFIU_ENABLE)
endif()

include_directories (src)
add_subdirectory (src)

Expand Down Expand Up @@ -78,6 +82,7 @@ add_headers_only(dbms src/Server)

list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
list (APPEND clickhouse_common_io_headers ${fiu_include_dirs})

list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionHelpers.cpp)
list (APPEND dbms_headers src/Functions/IFunction.h src/Functions/FunctionFactory.h src/Functions/FunctionHelpers.h)
Expand Down Expand Up @@ -157,6 +162,7 @@ target_link_libraries (clickhouse_common_io
${EXECINFO_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${CMAKE_DL_LIBS}
fiu
prometheus-cpp::core
prometheus-cpp::push
prometheus-cpp::pull
Expand Down Expand Up @@ -254,9 +260,6 @@ if (NOT USE_INTERNAL_ZSTD_LIBRARY)
target_include_directories (dbms BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()

#use libfiu
target_include_directories(dbms PUBLIC ${fiu_include_dirs})

target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io PUBLIC ${PCG_RANDOM_INCLUDE_DIR})
Expand All @@ -271,8 +274,6 @@ add_subdirectory (tests)
if (ENABLE_TESTS)
include (${ClickHouse_SOURCE_DIR}/cmake/find_gtest.cmake)

add_definitions(-DFIU_ENABLE)

if (USE_INTERNAL_GTEST_LIBRARY)
# Google Test from sources
add_subdirectory(${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest)
Expand Down
110 changes: 110 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include <Common/FailPoint.h>

#include <boost/core/noncopyable.hpp>
#include <condition_variable>
#include <mutex>

namespace DB
{
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;

#define APPLY_FOR_FAILPOINTS(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
M(exception_drop_table_during_remove_meta) \
M(exception_between_rename_table_data_and_metadata); \
M(exception_between_create_database_meta_and_directory); \
M(exception_before_rename_table_old_meta_removed); \
M(exception_after_step_1_in_exchange_partition) \
M(exception_before_step_2_rename_in_exchange_partition) \
M(exception_after_step_2_in_exchange_partition) \
M(exception_before_step_3_rename_in_exchange_partition) \
M(exception_after_step_3_in_exchange_partition) \
M(region_exception_after_read_from_storage_some_error) \
M(region_exception_after_read_from_storage_all_error)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) M(pause_after_learner_read)

namespace FailPoints
{
#define M(NAME) extern const char NAME[] = #NAME "";
APPLY_FOR_FAILPOINTS(M)
APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
#undef M
} // namespace FailPoints

#ifdef FIU_ENABLE
class FailPointChannel : private boost::noncopyable
{
public:
// wake up all waiting threads when destroy
~FailPointChannel() { cv.notify_all(); }

void wait()
{
std::unique_lock lock(m);
cv.wait(lock);
}

private:
std::mutex m;
std::condition_variable cv;
};

void FailPointHelper::enableFailPoint(const String & fail_point_name)
{
#define M(NAME) \
if (fail_point_name == FailPoints::NAME) \
{ \
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, FIU_ONETIME); \
return; \
}

APPLY_FOR_FAILPOINTS(M)
#undef M

#define M(NAME) \
if (fail_point_name == FailPoints::NAME) \
{ \
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, FIU_ONETIME); \
fail_point_wait_channels.try_emplace(FailPoints::NAME, std::make_shared<FailPointChannel>()); \
return; \
}

APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
#undef M
throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
}

void FailPointHelper::disableFailPoint(const String & fail_point_name)
{
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter != fail_point_wait_channels.end())
fail_point_wait_channels.erase(iter);
fiu_disable(fail_point_name.c_str());
}

void FailPointHelper::wait(const String & fail_point_name)
{
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter == fail_point_wait_channels.end())
throw Exception("Can not find channel for fail point" + fail_point_name);
else
{
auto ptr = iter->second;
ptr->wait();
}
}
#else
class FailPointChannel
{
};

void FailPointHelper::enableFailPoint(const String & fail_point_name) {}

void FailPointHelper::disableFailPoint(const String & fail_point_name) {}

void FailPointHelper::wait(const String & fail_point_name) {}
#endif

} // namespace DB
51 changes: 16 additions & 35 deletions dbms/src/Common/FailPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <fiu-local.h>
#include <fiu.h>

#include <unordered_map>

namespace DB
{

Expand All @@ -13,46 +15,25 @@ namespace ErrorCodes
extern const int FAIL_POINT_ERROR;
};

#define FAIL_POINT_REGISTER(name) static constexpr char name[] = #name "";

#define FAIL_POINT_ENABLE(trigger, name) \
else if (trigger == name) { fiu_enable(name, 1, nullptr, FIU_ONETIME); }

FAIL_POINT_REGISTER(exception_between_drop_meta_and_data)
FAIL_POINT_REGISTER(exception_between_alter_data_and_meta)
FAIL_POINT_REGISTER(exception_drop_table_during_remove_meta)
FAIL_POINT_REGISTER(exception_between_rename_table_data_and_metadata);
FAIL_POINT_REGISTER(exception_between_create_database_meta_and_directory);
FAIL_POINT_REGISTER(exception_before_rename_table_old_meta_removed);
FAIL_POINT_REGISTER(exception_after_step_1_in_exchange_partition)
FAIL_POINT_REGISTER(exception_before_step_2_rename_in_exchange_partition)
FAIL_POINT_REGISTER(exception_after_step_2_in_exchange_partition)
FAIL_POINT_REGISTER(exception_before_step_3_rename_in_exchange_partition)
FAIL_POINT_REGISTER(exception_after_step_3_in_exchange_partition)

/// Macros to set failpoints.
// When `fail_point` is enabled, throw an exception
#define FAIL_POINT_TRIGGER_EXCEPTION(fail_point) \
fiu_do_on(fail_point, throw Exception("Fail point " #fail_point " is triggered.", ErrorCodes::FAIL_POINT_ERROR);)
// When `fail_point` is enabled, wait till it is disabled
#define FAIL_POINT_PAUSE(fail_point) fiu_do_on(fail_point, FailPointHelper::wait(fail_point);)


class FailPointChannel;
class FailPointHelper
{
public:
static void enableFailPoint(const String & fail_point_name)
{
if (false) {}
FAIL_POINT_ENABLE(fail_point_name, exception_between_alter_data_and_meta)
FAIL_POINT_ENABLE(fail_point_name, exception_between_drop_meta_and_data)
FAIL_POINT_ENABLE(fail_point_name, exception_drop_table_during_remove_meta)
FAIL_POINT_ENABLE(fail_point_name, exception_between_rename_table_data_and_metadata)
FAIL_POINT_ENABLE(fail_point_name, exception_between_create_database_meta_and_directory)
FAIL_POINT_ENABLE(fail_point_name, exception_before_rename_table_old_meta_removed)
FAIL_POINT_ENABLE(fail_point_name, exception_after_step_1_in_exchange_partition)
FAIL_POINT_ENABLE(fail_point_name, exception_before_step_2_rename_in_exchange_partition)
FAIL_POINT_ENABLE(fail_point_name, exception_after_step_2_in_exchange_partition)
FAIL_POINT_ENABLE(fail_point_name, exception_before_step_3_rename_in_exchange_partition)
FAIL_POINT_ENABLE(fail_point_name, exception_after_step_3_in_exchange_partition)
else throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
}

static void disableFailPoint(const String & fail_point_name) { fiu_disable(fail_point_name.c_str()); }
static void enableFailPoint(const String & fail_point_name);

static void disableFailPoint(const String & fail_point_name);

static void wait(const String & fail_point_name);

private:
static std::unordered_map<String, std::shared_ptr<FailPointChannel>> fail_point_wait_channels;
};
} // namespace DB
10 changes: 8 additions & 2 deletions dbms/src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}

namespace FailPoints
{
extern const char exception_drop_table_during_remove_meta[];
extern const char exception_between_rename_table_data_and_metadata[];
}

static constexpr size_t PRINT_MESSAGE_EACH_N_TABLES = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
Expand Down Expand Up @@ -205,7 +211,7 @@ void DatabaseOrdinary::removeTable(
// If tiflash crash before remove metadata, next time it restart, will
// full apply schema from TiDB. And the old table's metadata and data
// will be removed.
FAIL_POINT_TRIGGER_EXCEPTION(exception_drop_table_during_remove_meta);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_drop_table_during_remove_meta);
Poco::File(table_metadata_path).remove();
}
catch (...)
Expand Down Expand Up @@ -249,7 +255,7 @@ void DatabaseOrdinary::renameTable(
}

// TODO: Atomic rename table is not fixed.
FAIL_POINT_TRIGGER_EXCEPTION(exception_between_rename_table_data_and_metadata);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_rename_table_data_and_metadata);

ASTPtr ast = DatabaseLoading::getQueryFromMetadata(context, detail::getTableMetadataPath(metadata_path, table_name));
if (!ast)
Expand Down
46 changes: 27 additions & 19 deletions dbms/src/Databases/DatabaseTiFlash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int SYNTAX_ERROR;
} // namespace ErrorCodes

namespace FailPoints
{
extern const char exception_drop_table_during_remove_meta[];
extern const char exception_before_rename_table_old_meta_removed[];
}

static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;

DatabaseTiFlash::DatabaseTiFlash(
Expand Down Expand Up @@ -178,8 +184,8 @@ void DatabaseTiFlash::createTable(const Context & context, const String & table_

/// If it was ATTACH query and file with table metadata already exist
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""),
table_metadata_path, EncryptionPath(table_metadata_path, ""));
context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""), table_metadata_path,
EncryptionPath(table_metadata_path, ""));
}
catch (...)
{
Expand All @@ -198,7 +204,7 @@ void DatabaseTiFlash::removeTable(const Context & context, const String & table_
// full apply schema from TiDB. And the old table's metadata and data
// will be removed.
String table_metadata_path = getTableMetadataPath(table_name);
FAIL_POINT_TRIGGER_EXCEPTION(exception_drop_table_during_remove_meta);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_drop_table_during_remove_meta);
context.getFileProvider()->deleteRegularFile(table_metadata_path, EncryptionPath(table_metadata_path, ""));
}
catch (...)
Expand Down Expand Up @@ -273,10 +279,11 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_
// then we cannot rename the encryption info and the file in an atomic operation.
bool use_target_encrypt_info = context.getFileProvider()->isFileEncrypted(EncryptionPath(new_tbl_meta_file, ""));
{
EncryptionPath encryption_path = use_target_encrypt_info ? EncryptionPath(new_tbl_meta_file, "") : EncryptionPath(new_tbl_meta_file_tmp, "");
EncryptionPath encryption_path
= use_target_encrypt_info ? EncryptionPath(new_tbl_meta_file, "") : EncryptionPath(new_tbl_meta_file_tmp, "");
bool create_new_encryption_info = !use_target_encrypt_info && statement.size();
WriteBufferFromFileProvider out(context.getFileProvider(), new_tbl_meta_file_tmp, encryption_path,
create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL);
WriteBufferFromFileProvider out(
context.getFileProvider(), new_tbl_meta_file_tmp, encryption_path, create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
out.next();
if (context.getSettingsRef().fsync_metadata)
Expand All @@ -293,8 +300,8 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_
}
else
{
context.getFileProvider()->renameFile(new_tbl_meta_file_tmp, EncryptionPath(new_tbl_meta_file_tmp, ""),
new_tbl_meta_file, EncryptionPath(new_tbl_meta_file, ""));
context.getFileProvider()->renameFile(new_tbl_meta_file_tmp, EncryptionPath(new_tbl_meta_file_tmp, ""), new_tbl_meta_file,
EncryptionPath(new_tbl_meta_file, ""));
}
}
catch (...)
Expand All @@ -306,15 +313,16 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_
throw;
}

FAIL_POINT_TRIGGER_EXCEPTION(exception_before_rename_table_old_meta_removed);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_rename_table_old_meta_removed);

// If only display name updated, don't remove `old_tbl_meta_file`.
if (!isSamePath(old_tbl_meta_file, new_tbl_meta_file))
{
// If process crash before removing old table meta file, we will continue or rollback this
// rename command next time `loadTables` is called. See `loadTables` and
// `DatabaseLoading::startupTables` for more details.
context.getFileProvider()->deleteRegularFile(old_tbl_meta_file, EncryptionPath(old_tbl_meta_file, ""));// Then remove old meta file
context.getFileProvider()->deleteRegularFile(
old_tbl_meta_file, EncryptionPath(old_tbl_meta_file, "")); // Then remove old meta file
}
}

Expand Down Expand Up @@ -365,10 +373,11 @@ void DatabaseTiFlash::alterTable(
// refer to the comment in `renameTable`
bool use_target_encrypt_info = context.getFileProvider()->isFileEncrypted(EncryptionPath(table_metadata_path, ""));
{
EncryptionPath encryption_path = use_target_encrypt_info ? EncryptionPath(table_metadata_path, "") : EncryptionPath(table_metadata_tmp_path, "");
EncryptionPath encryption_path
= use_target_encrypt_info ? EncryptionPath(table_metadata_path, "") : EncryptionPath(table_metadata_tmp_path, "");
bool create_new_encryption_info = !use_target_encrypt_info && statement.size();
WriteBufferFromFileProvider out(context.getFileProvider(), table_metadata_tmp_path, encryption_path,
create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL);
WriteBufferFromFileProvider out(
context.getFileProvider(), table_metadata_tmp_path, encryption_path, create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
out.next();
if (context.getSettingsRef().fsync_metadata)
Expand All @@ -385,16 +394,15 @@ void DatabaseTiFlash::alterTable(
}
else
{
context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""),
table_metadata_path, EncryptionPath(table_metadata_path, ""));
context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""), table_metadata_path,
EncryptionPath(table_metadata_path, ""));
}
}
catch (...)
{
if (!use_target_encrypt_info)
{
context.getFileProvider()->deleteRegularFile(table_metadata_tmp_path,
EncryptionPath(table_metadata_tmp_path, ""));
context.getFileProvider()->deleteRegularFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""));
}
throw;
}
Expand Down Expand Up @@ -482,8 +490,8 @@ void DatabaseTiFlash::drop(const Context & context)
// Remove meta file for this database
if (auto meta_file = Poco::File(getDatabaseMetadataPath(getMetadataPath())); meta_file.exists())
{
context.getFileProvider()->deleteRegularFile(getDatabaseMetadataPath(getMetadataPath()),
EncryptionPath(getDatabaseMetadataPath(getMetadataPath()), ""));
context.getFileProvider()->deleteRegularFile(
getDatabaseMetadataPath(getMetadataPath()), EncryptionPath(getDatabaseMetadataPath(getMetadataPath()), ""));
}
}

Expand Down
Loading

0 comments on commit b6a151a

Please sign in to comment.