Skip to content

Commit

Permalink
test: Add tests case when deltaMergeBySplit + { flushCache | split } …
Browse files Browse the repository at this point in the history
…occurs simultaneously (#5454) (#5486)

ref #5409
  • Loading branch information
ti-chi-bot authored Aug 22, 2022
1 parent 14906e8 commit 45202b0
Show file tree
Hide file tree
Showing 18 changed files with 714 additions and 61 deletions.
2 changes: 1 addition & 1 deletion contrib/googletest
Submodule googletest updated 250 files
2 changes: 2 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)

add_headers_and_sources(clickhouse_common_io src/Common)
add_headers_and_sources(clickhouse_common_io src/Common/HashTable)
add_headers_and_sources(clickhouse_common_io src/Common/SyncPoint)
add_headers_and_sources(clickhouse_common_io src/IO)

add_headers_and_sources(dbms src/Analyzers)
Expand Down Expand Up @@ -262,6 +263,7 @@ if (ENABLE_TESTS)
include (${TiFlash_SOURCE_DIR}/cmake/find_gtest.cmake)

if (USE_INTERNAL_GTEST_LIBRARY)
set(INSTALL_GTEST OFF)
# Google Test from sources
add_subdirectory(${TiFlash_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest)
# avoid problems with <regexp.h>
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_before_server_merge_one_delta)
M(pause_after_copr_streams_acquired)

namespace FailPoints
{
Expand Down
119 changes: 119 additions & 0 deletions dbms/src/Common/SyncPoint/Ctl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/SyncPoint/Ctl.h>
#include <Common/SyncPoint/ScopeGuard.h>
#include <Common/SyncPoint/SyncChannel.h>
#include <common/logger_useful.h>
#include <fiu-control.h>

namespace DB
{

#ifdef FIU_ENABLE

void SyncPointCtl::enable(const char * name)
{
{
std::unique_lock lock(mu);
channels.try_emplace(name,
std::make_pair(
std::make_shared<SyncChannel>(),
std::make_shared<SyncChannel>()));
}
fiu_enable(name, 1, nullptr, 0);
LOG_FMT_DEBUG(getLogger(), "Enabled: {}", name);
}

void SyncPointCtl::disable(const char * name)
{
fiu_disable(name);
{
std::unique_lock lock(mu);
if (auto const & iter = channels.find(name); iter != channels.end())
{
auto [first_ch, second_ch] = iter->second;
first_ch->close();
second_ch->close();
channels.erase(iter);
}
}
LOG_FMT_DEBUG(getLogger(), "Disabled: {}", name);
}

std::pair<SyncPointCtl::SyncChannelPtr, SyncPointCtl::SyncChannelPtr> SyncPointCtl::mustGetChannel(const char * name)
{
std::unique_lock lock(mu);
if (auto iter = channels.find(name); iter == channels.end())
{
throw Exception(fmt::format("SyncPoint {} is not enabled", name));
}
else
{
return iter->second;
}
}

void SyncPointCtl::waitAndPause(const char * name)
{
auto ch = mustGetChannel(name).first;
LOG_FMT_DEBUG(getLogger(), "waitAndPause({}) waiting...", name);
auto result = ch->recv();
LOG_FMT_DEBUG(getLogger(), "waitAndPause({}) {}", name, result ? "finished" : "cancelled");
}

void SyncPointCtl::next(const char * name)
{
auto ch = mustGetChannel(name).second;
LOG_FMT_DEBUG(getLogger(), "next({}) trying...", name);
auto result = ch->send();
LOG_FMT_DEBUG(getLogger(), "next({}) {}", name, result ? "done" : "cancelled");
}

void SyncPointCtl::sync(const char * name)
{
auto [ch_1, ch_2] = mustGetChannel(name);
// Print a stack, which is helpful to know where undesired SYNC_FOR comes from.
LOG_FMT_DEBUG(getLogger(), "SYNC_FOR({}) trying... \n\n# Current Stack: {}", name, StackTrace().toString());
auto result = ch_1->send();
LOG_FMT_DEBUG(getLogger(), "SYNC_FOR({}) {}", name, //
result ? "matched waitAndPause(), paused until calling next()..." : "cancelled");
if (!result)
return;
result = ch_2->recv();
LOG_FMT_DEBUG(getLogger(), "SYNC_FOR({}) {}", name, result ? "done" : "cancelled");
}

#else

void SyncPointCtl::enable(const char *)
{}

void SyncPointCtl::disable(const char *) {}

void SyncPointCtl::waitAndPause(const char *) {}

void SyncPointCtl::next(const char *) {}

void SyncPointCtl::sync(const char *) {}

#endif

SyncPointScopeGuard SyncPointCtl::enableInScope(const char * name)
{
return SyncPointScopeGuard(name);
}

} // namespace DB
94 changes: 94 additions & 0 deletions dbms/src/Common/SyncPoint/Ctl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/SyncPoint/ScopeGuard.h>
#include <Poco/Logger.h>

#include <memory>
#include <mutex>
#include <unordered_map>

namespace DB
{

class SyncPointCtl
{
public:
/**
* Enable the sync point. After enabling, when executed to the sync point defined with `SYNC_FOR()`,
* the execution will be suspended, until `waitAndPause()` or `waitAndNext()` is called
* somewhere (e.g. in tests).
*/
static void enable(const char * name);

/**
* Disable the sync point. Existing suspends will be continued.
*/
static void disable(const char * name);

/**
* Suspend the execution, until `waitAndPause()`, `next()` or `waitAndNext()` is called somewhere.
* You should not invoke this function directly. Invoke `SYNC_FOR()` instead.
*/
static void sync(const char * name);

/**
* Wait for the sync point being executed. The code at the sync point will keep
* pausing until you call `next()`.
*/
static void waitAndPause(const char * name);

/**
* Continue the execution after the specified sync point.
* You must first `waitAndPause()` for it, then `next()` it.
*/
static void next(const char * name);

/**
* Wait for the sync point being executed. After that, continue the execution after the sync point.
*/
static void waitAndNext(const char * name)
{
waitAndPause(name);
next(name);
}

/**
* Enable the sync point in the current scope. When scope exits, the sync point will be disabled.
*
* After enabling, when executed to the sync point defined with `SYNC_FOR()`, the execution
* will be suspended, until `waitAndPause()` or `waitAndNext()` is called somewhere (e.g. in tests).
*/
static SyncPointScopeGuard enableInScope(const char * name);

private:
class SyncChannel;
using SyncChannelPtr = std::shared_ptr<SyncChannel>;

static Poco::Logger * getLogger()
{
static Poco::Logger * logger = &Poco::Logger::get("SyncPointCtl");
return logger;
}

static std::pair<SyncChannelPtr, SyncChannelPtr> mustGetChannel(const char * name);

inline static std::unordered_map<std::string, std::pair<SyncChannelPtr, SyncChannelPtr>>
channels{};
inline static std::mutex mu{};
};

} // namespace DB
50 changes: 50 additions & 0 deletions dbms/src/Common/SyncPoint/ScopeGuard.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/SyncPoint/Ctl.h>
#include <Common/SyncPoint/ScopeGuard.h>

namespace DB
{

SyncPointScopeGuard::SyncPointScopeGuard(const char * name_)
: name(name_)
{
SyncPointCtl::enable(name_);
}

void SyncPointScopeGuard::disable()
{
if (disabled)
return;
SyncPointCtl::disable(name.c_str());
disabled = true;
}

void SyncPointScopeGuard::waitAndPause()
{
SyncPointCtl::waitAndPause(name.c_str());
}

void SyncPointScopeGuard::next()
{
SyncPointCtl::next(name.c_str());
}

void SyncPointScopeGuard::waitAndNext()
{
SyncPointCtl::waitAndNext(name.c_str());
}

} // namespace DB
60 changes: 60 additions & 0 deletions dbms/src/Common/SyncPoint/ScopeGuard.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <string>

namespace DB
{

class SyncPointScopeGuard
{
public:
explicit SyncPointScopeGuard(const char * name_);

~SyncPointScopeGuard()
{
disable();
}

/**
* Disable this sync point beforehand, instead of at the moment when
* this scope guard is destructed.
*/
void disable();

/**
* Wait for the sync point being executed. The code at the sync point will keep
* pausing until you call `next()`.
*/
void waitAndPause();

/**
* Continue the execution after the specified sync point.
* You must first `waitAndPause()` for it, then `next()` it.
*/
void next();

/**
* Wait for the sync point being executed. After that, continue the execution after the sync point.
*/
void waitAndNext();

private:
std::string name;
bool disabled = false;
};

} // namespace DB
Loading

0 comments on commit 45202b0

Please sign in to comment.