Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add tests case when deltaMergeBySplit + { flushCache | split } occurs simultaneously (#5454) #5486

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/googletest
Submodule googletest updated 250 files
2 changes: 2 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)

add_headers_and_sources(clickhouse_common_io src/Common)
add_headers_and_sources(clickhouse_common_io src/Common/HashTable)
add_headers_and_sources(clickhouse_common_io src/Common/SyncPoint)
add_headers_and_sources(clickhouse_common_io src/IO)

add_headers_and_sources(dbms src/Analyzers)
Expand Down Expand Up @@ -262,6 +263,7 @@ if (ENABLE_TESTS)
include (${TiFlash_SOURCE_DIR}/cmake/find_gtest.cmake)

if (USE_INTERNAL_GTEST_LIBRARY)
set(INSTALL_GTEST OFF)
# Google Test from sources
add_subdirectory(${TiFlash_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest)
# avoid problems with <regexp.h>
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_before_server_merge_one_delta)
M(pause_after_copr_streams_acquired)

namespace FailPoints
{
Expand Down
119 changes: 119 additions & 0 deletions dbms/src/Common/SyncPoint/Ctl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/SyncPoint/Ctl.h>
#include <Common/SyncPoint/ScopeGuard.h>
#include <Common/SyncPoint/SyncChannel.h>
#include <common/logger_useful.h>
#include <fiu-control.h>

namespace DB
{

#ifdef FIU_ENABLE

void SyncPointCtl::enable(const char * name)
{
{
std::unique_lock lock(mu);
channels.try_emplace(name,
std::make_pair(
std::make_shared<SyncChannel>(),
std::make_shared<SyncChannel>()));
}
fiu_enable(name, 1, nullptr, 0);
LOG_FMT_DEBUG(getLogger(), "Enabled: {}", name);
}

void SyncPointCtl::disable(const char * name)
{
fiu_disable(name);
{
std::unique_lock lock(mu);
if (auto const & iter = channels.find(name); iter != channels.end())
{
auto [first_ch, second_ch] = iter->second;
first_ch->close();
second_ch->close();
channels.erase(iter);
}
}
LOG_FMT_DEBUG(getLogger(), "Disabled: {}", name);
}

std::pair<SyncPointCtl::SyncChannelPtr, SyncPointCtl::SyncChannelPtr> SyncPointCtl::mustGetChannel(const char * name)
{
std::unique_lock lock(mu);
if (auto iter = channels.find(name); iter == channels.end())
{
throw Exception(fmt::format("SyncPoint {} is not enabled", name));
}
else
{
return iter->second;
}
}

void SyncPointCtl::waitAndPause(const char * name)
{
auto ch = mustGetChannel(name).first;
LOG_FMT_DEBUG(getLogger(), "waitAndPause({}) waiting...", name);
auto result = ch->recv();
LOG_FMT_DEBUG(getLogger(), "waitAndPause({}) {}", name, result ? "finished" : "cancelled");
}

void SyncPointCtl::next(const char * name)
{
auto ch = mustGetChannel(name).second;
LOG_FMT_DEBUG(getLogger(), "next({}) trying...", name);
auto result = ch->send();
LOG_FMT_DEBUG(getLogger(), "next({}) {}", name, result ? "done" : "cancelled");
}

void SyncPointCtl::sync(const char * name)
{
auto [ch_1, ch_2] = mustGetChannel(name);
// Print a stack, which is helpful to know where undesired SYNC_FOR comes from.
LOG_FMT_DEBUG(getLogger(), "SYNC_FOR({}) trying... \n\n# Current Stack: {}", name, StackTrace().toString());
auto result = ch_1->send();
LOG_FMT_DEBUG(getLogger(), "SYNC_FOR({}) {}", name, //
result ? "matched waitAndPause(), paused until calling next()..." : "cancelled");
if (!result)
return;
result = ch_2->recv();
LOG_FMT_DEBUG(getLogger(), "SYNC_FOR({}) {}", name, result ? "done" : "cancelled");
}

#else

void SyncPointCtl::enable(const char *)
{}

void SyncPointCtl::disable(const char *) {}

void SyncPointCtl::waitAndPause(const char *) {}

void SyncPointCtl::next(const char *) {}

void SyncPointCtl::sync(const char *) {}

#endif

SyncPointScopeGuard SyncPointCtl::enableInScope(const char * name)
{
return SyncPointScopeGuard(name);
}

} // namespace DB
94 changes: 94 additions & 0 deletions dbms/src/Common/SyncPoint/Ctl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/SyncPoint/ScopeGuard.h>
#include <Poco/Logger.h>

#include <memory>
#include <mutex>
#include <unordered_map>

namespace DB
{

class SyncPointCtl
{
public:
/**
* Enable the sync point. After enabling, when executed to the sync point defined with `SYNC_FOR()`,
* the execution will be suspended, until `waitAndPause()` or `waitAndNext()` is called
* somewhere (e.g. in tests).
*/
static void enable(const char * name);

/**
* Disable the sync point. Existing suspends will be continued.
*/
static void disable(const char * name);

/**
* Suspend the execution, until `waitAndPause()`, `next()` or `waitAndNext()` is called somewhere.
* You should not invoke this function directly. Invoke `SYNC_FOR()` instead.
*/
static void sync(const char * name);

/**
* Wait for the sync point being executed. The code at the sync point will keep
* pausing until you call `next()`.
*/
static void waitAndPause(const char * name);

/**
* Continue the execution after the specified sync point.
* You must first `waitAndPause()` for it, then `next()` it.
*/
static void next(const char * name);

/**
* Wait for the sync point being executed. After that, continue the execution after the sync point.
*/
static void waitAndNext(const char * name)
{
waitAndPause(name);
next(name);
}

/**
* Enable the sync point in the current scope. When scope exits, the sync point will be disabled.
*
* After enabling, when executed to the sync point defined with `SYNC_FOR()`, the execution
* will be suspended, until `waitAndPause()` or `waitAndNext()` is called somewhere (e.g. in tests).
*/
static SyncPointScopeGuard enableInScope(const char * name);

private:
class SyncChannel;
using SyncChannelPtr = std::shared_ptr<SyncChannel>;

static Poco::Logger * getLogger()
{
static Poco::Logger * logger = &Poco::Logger::get("SyncPointCtl");
return logger;
}

static std::pair<SyncChannelPtr, SyncChannelPtr> mustGetChannel(const char * name);

inline static std::unordered_map<std::string, std::pair<SyncChannelPtr, SyncChannelPtr>>
channels{};
inline static std::mutex mu{};
};

} // namespace DB
50 changes: 50 additions & 0 deletions dbms/src/Common/SyncPoint/ScopeGuard.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/SyncPoint/Ctl.h>
#include <Common/SyncPoint/ScopeGuard.h>

namespace DB
{

SyncPointScopeGuard::SyncPointScopeGuard(const char * name_)
: name(name_)
{
SyncPointCtl::enable(name_);
}

void SyncPointScopeGuard::disable()
{
if (disabled)
return;
SyncPointCtl::disable(name.c_str());
disabled = true;
}

void SyncPointScopeGuard::waitAndPause()
{
SyncPointCtl::waitAndPause(name.c_str());
}

void SyncPointScopeGuard::next()
{
SyncPointCtl::next(name.c_str());
}

void SyncPointScopeGuard::waitAndNext()
{
SyncPointCtl::waitAndNext(name.c_str());
}

} // namespace DB
60 changes: 60 additions & 0 deletions dbms/src/Common/SyncPoint/ScopeGuard.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <string>

namespace DB
{

class SyncPointScopeGuard
{
public:
explicit SyncPointScopeGuard(const char * name_);

~SyncPointScopeGuard()
{
disable();
}

/**
* Disable this sync point beforehand, instead of at the moment when
* this scope guard is destructed.
*/
void disable();

/**
* Wait for the sync point being executed. The code at the sync point will keep
* pausing until you call `next()`.
*/
void waitAndPause();

/**
* Continue the execution after the specified sync point.
* You must first `waitAndPause()` for it, then `next()` it.
*/
void next();

/**
* Wait for the sync point being executed. After that, continue the execution after the sync point.
*/
void waitAndNext();

private:
std::string name;
bool disabled = false;
};

} // namespace DB
Loading