Skip to content

Commit

Permalink
Enhancement: add a integrated test on DDL module (#5130)
Browse files Browse the repository at this point in the history
ref #5129
  • Loading branch information
hongyunyan authored Jun 22, 2022
1 parent f3f37ae commit 649462a
Show file tree
Hide file tree
Showing 4 changed files with 936 additions and 21 deletions.
46 changes: 25 additions & 21 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,37 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_remote_read_for_batch_cop) \
M(force_context_path) \
M(force_slow_page_storage_snapshot_release) \
M(force_change_all_blobs_to_read_only)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(force_change_all_blobs_to_read_only) \
M(unblock_query_init_after_write)


#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(pause_after_copr_streams_acquired_once)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_before_server_merge_one_delta)
#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_before_server_merge_one_delta) \
M(pause_query_init)


namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";
APPLY_FOR_FAILPOINTS_ONCE(M)
APPLY_FOR_FAILPOINTS(M)
APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M)
APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS(M)
#undef M
} // namespace FailPoints

Expand Down Expand Up @@ -167,11 +171,11 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name)
}

#define M(NAME) SUB_M(NAME, FIU_ONETIME)
APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M)
#undef M

#define M(NAME) SUB_M(NAME, 0)
APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS(M)
#undef M
#undef SUB_M

Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Columns/Collator.h>
#include <Common/FailPoint.h>
#include <Common/Logger.h>
#include <Common/TiFlashException.h>
#include <Common/typeid_cast.h>
Expand Down Expand Up @@ -93,6 +94,12 @@ extern const int SCHEMA_VERSION_ERROR;
extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes


namespace FailPoints
{
extern const char pause_query_init[];
} // namespace FailPoints

InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
Expand Down Expand Up @@ -131,6 +138,15 @@ InterpreterSelectQuery::~InterpreterSelectQuery() = default;

void InterpreterSelectQuery::init(const Names & required_result_column_names)
{
/// the failpoint pause_query_init should use with the failpoint unblock_query_init_after_write,
/// to fulfill that the select query action will be blocked before init state to wait the write action finished.
/// In using, we need enable unblock_query_init_after_write in our test code,
/// and before each write statement take effect, we need enable pause_query_init.
/// When the write action finished, the pause_query_init will be disabled automatically,
/// and then the select query could be continued.
/// you can refer multi_alter_with_write.test for an example.
FAIL_POINT_PAUSE(FailPoints::pause_query_init);

if (!context.hasQueryContext())
context.setQueryContext(context);

Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ namespace FailPoints
extern const char pause_before_apply_raft_cmd[];
extern const char pause_before_apply_raft_snapshot[];
extern const char force_set_safepoint_when_decode_block[];
extern const char unblock_query_init_after_write[];
extern const char pause_query_init[];
} // namespace FailPoints

namespace ErrorCodes
Expand Down Expand Up @@ -151,6 +153,7 @@ static void writeRegionDataToStorage(
default:
throw Exception("Unknown StorageEngine: " + toString(static_cast<Int32>(storage->engineType())), ErrorCodes::LOGICAL_ERROR);
}

write_part_cost = watch.elapsedMilliseconds();
GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_write).Observe(write_part_cost / 1000.0);
if (need_decode)
Expand All @@ -165,10 +168,20 @@ static void writeRegionDataToStorage(
/// decoding data. Check the test case for more details.
FAIL_POINT_PAUSE(FailPoints::pause_before_apply_raft_cmd);

/// disable pause_query_init when the write action finish, to make the query action continue.
/// the usage of unblock_query_init_after_write and pause_query_init can refer to InterpreterSelectQuery::init
SCOPE_EXIT({
fiu_do_on(FailPoints::unblock_query_init_after_write, {
FailPointHelper::disableFailPoint(FailPoints::pause_query_init);
});
});

/// Try read then write once.
{
if (atomic_read_write(false))
{
return;
}
}

/// If first try failed, sync schema and force read then write.
Expand All @@ -177,10 +190,12 @@ static void writeRegionDataToStorage(
tmt.getSchemaSyncer()->syncSchemas(context);

if (!atomic_read_write(true))
{
// Failure won't be tolerated this time.
// TODO: Enrich exception message.
throw Exception("Write region " + std::to_string(region->id()) + " to table " + std::to_string(table_id) + " failed",
ErrorCodes::LOGICAL_ERROR);
}
}
}

Expand Down
Loading

0 comments on commit 649462a

Please sign in to comment.