Skip to content

Commit

Permalink
Make super batch more robust
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang committed Sep 17, 2020
1 parent 1c24ca5 commit 53fd650
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 7 deletions.
11 changes: 6 additions & 5 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ endif ()

find_package (Threads)

if (ENABLE_TESTS)
add_definitions(-DFIU_ENABLE)
endif()

include_directories (src)
add_subdirectory (src)

Expand Down Expand Up @@ -78,6 +82,7 @@ add_headers_only(dbms src/Server)

list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
list (APPEND clickhouse_common_io_headers ${fiu_include_dirs})

list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionHelpers.cpp)
list (APPEND dbms_headers src/Functions/IFunction.h src/Functions/FunctionFactory.h src/Functions/FunctionHelpers.h)
Expand Down Expand Up @@ -157,6 +162,7 @@ target_link_libraries (clickhouse_common_io
${EXECINFO_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${CMAKE_DL_LIBS}
fiu
prometheus-cpp::core
prometheus-cpp::push
prometheus-cpp::pull
Expand Down Expand Up @@ -254,9 +260,6 @@ if (NOT USE_INTERNAL_ZSTD_LIBRARY)
target_include_directories (dbms BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()

#use libfiu
target_include_directories(dbms PUBLIC ${fiu_include_dirs})

target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io PUBLIC ${PCG_RANDOM_INCLUDE_DIR})
Expand All @@ -271,8 +274,6 @@ add_subdirectory (tests)
if (ENABLE_TESTS)
include (${ClickHouse_SOURCE_DIR}/cmake/find_gtest.cmake)

add_definitions(-DFIU_ENABLE)

if (USE_INTERNAL_GTEST_LIBRARY)
# Google Test from sources
add_subdirectory(${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest)
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#include <Common/FailPoint.h>

namespace DB
{
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;

} // namespace DB
58 changes: 57 additions & 1 deletion dbms/src/Common/FailPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
#include <fiu-local.h>
#include <fiu.h>

#include <boost/core/noncopyable.hpp>
#include <condition_variable>
#include <mutex>
#include <unordered_map>

namespace DB
{

Expand All @@ -17,6 +22,12 @@ extern const int FAIL_POINT_ERROR;

#define FAIL_POINT_ENABLE(trigger, name) \
else if (trigger == name) { fiu_enable(name, 1, nullptr, FIU_ONETIME); }
#define FAIL_POINT_ENABLE_WITH_CHANNEL(trigger, name) \
else if (trigger == name) \
{ \
fiu_enable(name, 1, nullptr, FIU_ONETIME); \
fail_point_wait_channels.try_emplace(name, std::make_shared<FailPointChannel>()); \
}

FAIL_POINT_REGISTER(exception_between_drop_meta_and_data)
FAIL_POINT_REGISTER(exception_between_alter_data_and_meta)
Expand All @@ -29,9 +40,33 @@ FAIL_POINT_REGISTER(exception_before_step_2_rename_in_exchange_partition)
FAIL_POINT_REGISTER(exception_after_step_2_in_exchange_partition)
FAIL_POINT_REGISTER(exception_before_step_3_rename_in_exchange_partition)
FAIL_POINT_REGISTER(exception_after_step_3_in_exchange_partition)
FAIL_POINT_REGISTER(region_exception_after_read_from_storage)
FAIL_POINT_REGISTER(pause_after_learner_read)

// Macros to set failpoint
#define FAIL_POINT_TRIGGER_EXCEPTION(fail_point) \
fiu_do_on(fail_point, throw Exception("Fail point " #fail_point " is triggered.", ErrorCodes::FAIL_POINT_ERROR);)
#define FAIL_POINT_PAUSE(fail_point) \
fiu_do_on(fail_point, FailPointHelper::wait(fail_point);)
// #define FAIL_POINT_TRIGGER_REGION_EXCEPTION(fail_point) fiu_do_on(fail_point, throw RegionException(); )


class FailPointChannel : private boost::noncopyable
{
public:
// wake up all waiting threads when destroy
~FailPointChannel() { cv.notify_all(); }

void wait()
{
std::unique_lock lock(m);
cv.wait(lock);
}

private:
std::mutex m;
std::condition_variable cv;
};

class FailPointHelper
{
Expand All @@ -50,9 +85,30 @@ class FailPointHelper
FAIL_POINT_ENABLE(fail_point_name, exception_after_step_2_in_exchange_partition)
FAIL_POINT_ENABLE(fail_point_name, exception_before_step_3_rename_in_exchange_partition)
FAIL_POINT_ENABLE(fail_point_name, exception_after_step_3_in_exchange_partition)
FAIL_POINT_ENABLE(fail_point_name, region_exception_after_read_from_storage)
FAIL_POINT_ENABLE_WITH_CHANNEL(fail_point_name, pause_after_learner_read)
else throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
}

static void disableFailPoint(const String & fail_point_name) { fiu_disable(fail_point_name.c_str()); }
static void disableFailPoint(const String & fail_point_name)
{
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter != fail_point_wait_channels.end())
fail_point_wait_channels.erase(iter);
fiu_disable(fail_point_name.c_str());
}

static void wait(const String & fail_point_name)
{
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter == fail_point_wait_channels.end())
throw Exception("Can not find channel for fail point" + fail_point_name);
else
{
auto ptr = iter->second;
ptr->wait();
}
}

private:
static std::unordered_map<String, std::shared_ptr<FailPointChannel>> fail_point_wait_channels;
};
} // namespace DB
44 changes: 43 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <Common/FailPoint.h>
#include <Common/TiFlashException.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
Expand Down Expand Up @@ -249,6 +250,7 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline &
query_info.dag_query = std::make_unique<DAGQueryInfo>(conditions, analyzer->getPreparedSets(), analyzer->getCurrentInputColumns());
query_info.mvcc_query_info = std::move(mvcc_query_info);

FAIL_POINT_PAUSE(pause_after_learner_read);
bool need_local_read = !query_info.mvcc_query_info->regions_query_info.empty();
if (need_local_read)
{
Expand All @@ -257,20 +259,60 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline &
try
{
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);

// After getting streams from storage, we need to validate whether regions have changed or not after learner read.
// In case the versions of regions have changed, those `streams` may contain different data other than expected.
// Like after region merge/split.

// Inject failpoint to throw RegionException
fiu_do_on(region_exception_after_read_from_storage, {
const auto & regions_info = query_info.mvcc_query_info->regions_query_info;
std::vector<RegionID> region_ids;
for (const auto & info : regions_info)
region_ids.push_back(info.region_id);
throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND);
});
validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log);
}
catch (DB::RegionException & e)
{
/// Recover from region exception when super batch is enable
if (dag.isBatchCop())
{
// clean all streams from local because we are not sure the correctness of those streams
pipeline.streams.clear();
std::stringstream ss;
// push regions to `region_retry` to retry from other tiflash nodes
for (const auto & region : query_info.mvcc_query_info->regions_query_info)
{
const auto & dag_regions = dag.getRegions();
auto iter = dag_regions.find(region.region_id);
if (likely(iter != dag_regions.end()))
{
region_retry.emplace(iter->first, iter->second);
ss << iter->first << ",";
}
}
LOG_WARNING(log, "RegionException after read from storage, regions [" << ss.str() << "], message: " << e.message());
}
else
{
// Throw an exception for TiDB / TiSpark to retry
e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName()
+ "`, table_id: " + DB::toString(table_id) + ")");
throw;
}
}
catch (DB::Exception & e)
{
/// Other unknown exceptions
e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName()
+ "`, table_id: " + DB::toString(table_id) + ")");
throw;
}
}

// For those regions which are not presented in this tiflash node, we will try to fetch streams from other tiflash nodes, only happens in batch cop mode.
// For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop mode.
if (!region_retry.empty())
{
LOG_DEBUG(log, ({
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/InterpreterDAGHelper.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Common/TiFlashException.h>
#include <Storages/Transaction/RegionException.h>

namespace DB
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
mysql> drop table if exists test.t

>> DBGInvoke __init_fail_point()

# Ensure that we can create table for that database
mysql> create table test.t(a int not null, b int not null)
mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc'

func> wait_table test t

mysql> insert into test.t values (1, 1),(1, 2)

# Region meta changed after read from storage
>> DBGInvoke __enable_fail_point(region_exception_after_read_from_storage)

mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t;
+---+---+
| a | b |
+---+---+
| 1 | 1 |
| 1 | 2 |
+---+---+

mysql> drop table if exists test.t
>> DBGInvoke __disable_fail_point(region_exception_after_read_from_storage)

0 comments on commit 53fd650

Please sign in to comment.