Skip to content

Commit

Permalink
Merge branch 'fix-restore-error' of github.com:jiaqizho/tics into fix…
Browse files Browse the repository at this point in the history
…-restore-error
  • Loading branch information
jiaqizho committed Mar 21, 2022
2 parents 348cd5e + 5539552 commit da2a63a
Show file tree
Hide file tree
Showing 74 changed files with 1,404 additions and 504 deletions.
11 changes: 7 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ set(CMAKE_SKIP_INSTALL_ALL_DEPENDENCY true)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${TiFlash_SOURCE_DIR}/cmake/Modules/")
set(CMAKE_MACOSX_RPATH 1)

# Rust jobs may modify global toolchain settings. We make rust related jobs sequentially scheduled
# by putting them in a job pool with single concurrency.
set_property(GLOBAL PROPERTY JOB_POOLS rust_job_pool=1)

option(TIFLASH_ENABLE_LLVM_DEVELOPMENT "enable facilities for development with LLVM" OFF)

if(CMAKE_PREFIX_PATH)
Expand Down Expand Up @@ -442,3 +438,10 @@ add_subdirectory (dbms)

include (cmake/print_include_directories.cmake)

# Building a rust project may require setting up toolchains. Rustup cannot handle parallel requests, hence it may fail to
# finish the setup if multiple projects want to install a same version. To mitigate the situation, we force cargo targets
# to be invoked sequentially by adding linear dependency relations between them.
# Another way to do so is to use JOB_POOL with single concurrency. However, it is Ninja specific and it seems to have bugs
# with a large number of threads.
include (cmake/sequential.cmake)
build_sequentially (symbolization tiflash_proxy)
4 changes: 4 additions & 0 deletions cmake/find_tiflash_proxy.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ endif()
set(TIFLASH_PROXY_FOUND TRUE)

message(STATUS "Using tiflash proxy: ${USE_INTERNAL_TIFLASH_PROXY} : ${TIFLASH_PROXY_INCLUDE_DIR}, ${TIFLASH_PROXY_LIBRARY}")

if (NOT USE_INTERNAL_TIFLASH_PROXY)
add_custom_target(tiflash_proxy ALL DEPENDS ${TIFLASH_PROXY_LIBRARY})
endif()
20 changes: 20 additions & 0 deletions cmake/sequential.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Mark targets to build sequentially
# Example: build_sequentially(a, b, c, d)
# a -> b -> c -> d
function(build_sequentially target1 target2)
add_dependencies(${target2} ${target1})
if(${ARGC} GREATER 2)
build_sequentially(${target2} ${ARGN})
endif()
endfunction()
3 changes: 1 addition & 2 deletions contrib/tiflash-proxy-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ add_custom_command(OUTPUT ${_TIFLASH_PROXY_LIBRARY}
DEPENDS "${_TIFLASH_PROXY_SRCS}" "${_TIFLASH_PROXY_SOURCE_DIR}/Cargo.lock" "${_TIFLASH_PROXY_SOURCE_DIR}/rust-toolchain")

add_custom_target(tiflash_proxy ALL
DEPENDS ${_TIFLASH_PROXY_LIBRARY}
JOB_POOL rust_job_pool)
DEPENDS ${_TIFLASH_PROXY_LIBRARY})

add_library(libtiflash_proxy SHARED IMPORTED GLOBAL)
set_target_properties(libtiflash_proxy PROPERTIES IMPORTED_LOCATION ${_TIFLASH_PROXY_LIBRARY} IMPORTED_NO_SONAME ON)
Expand Down
131 changes: 131 additions & 0 deletions dbms/src/Common/Logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/FmtUtils.h>
#include <Poco/Logger.h>

#include <boost/noncopyable.hpp>

namespace DB
{
class Logger;
using LoggerPtr = std::shared_ptr<Logger>;

/**
* Logger is to support identifiers based on Poco::Logger.
*
* Identifiers could be request_id, session_id, etc. They can be used in `LogSearch` when we want to
* glob all logs related to one request/session/query.
*
* Logger will print all identifiers at the front of each log record (and after the `source`).
*
* Interfaces in Logger are definitely the same with the Poco::Logger, so that they could use the same
* macro such as LOG_INFO() etc.
*/
class Logger : private boost::noncopyable
{
public:
static LoggerPtr get(const std::string & source)
{
return std::make_shared<Logger>(source, "");
}

static LoggerPtr get(const std::string & source, const std::string & identifier)
{
return std::make_shared<Logger>(source, identifier);
}

template <typename... Args>
static LoggerPtr get(const std::string & source, Args &&... args)
{
FmtBuffer buf;
return getInternal(source, buf, std::forward<Args>(args)...);
}

Logger(const std::string & source, const std::string & identifier)
: logger(&Poco::Logger::get(source))
, id(identifier)
{
}

#define M(level) \
bool level() const { return logger->level(); } \
void level(const std::string & msg) const \
{ \
if (id.empty()) \
logger->level(msg); \
else \
logger->level(wrapMsg(msg)); \
}

M(trace)
M(debug)
M(information)
M(warning)
M(error)
M(fatal)
#undef M

void log(const Poco::Message & msg) const
{
if (id.empty())
return logger->log(msg);
else
return logger->log(Poco::Message(msg, wrapMsg(msg.getText())));
}

void log(Poco::Message & msg) const
{
if (!id.empty())
msg.setText(wrapMsg(msg.getText()));
return logger->log(msg);
}

bool is(int level) const { return logger->is(level); }

Poco::Channel * getChannel() const { return logger->getChannel(); }

const std::string & name() const { return logger->name(); }

const std::string & identifier() const { return id; }

Poco::Logger * getLog() const { return logger; }

private:
template <typename T, typename... Args>
static LoggerPtr getInternal(const std::string & source, FmtBuffer & buf, T && first, Args &&... args)
{
buf.fmtAppend("{} ", std::forward<T>(first));
return getInternal(source, buf, std::forward<Args>(args)...);
}

template <typename T>
static LoggerPtr getInternal(const std::string & source, FmtBuffer & buf, T && identifier)
{
buf.fmtAppend("{}", std::forward<T>(identifier));
return get(source, buf.toString());
}

std::string wrapMsg(const std::string & msg) const
{
return fmt::format("{} {}", id, msg);
}

Poco::Logger * logger;
const std::string id;
};

} // namespace DB
70 changes: 70 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/PathCapacityMetrics.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/BackgroundService.h>
Expand Down Expand Up @@ -161,6 +163,7 @@ struct ContextShared
PathCapacityMetricsPtr path_capacity_ptr; /// Path capacity metrics
FileProviderPtr file_provider; /// File provider.
IORateLimiter io_rate_limiter;
DM::GlobalStoragePoolPtr global_storage_pool;
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.

class SessionKeyHash
Expand Down Expand Up @@ -1571,6 +1574,73 @@ ReadLimiterPtr Context::getReadLimiter() const
return getIORateLimiter().getReadLimiter();
}

static bool isUsingPageStorageV3(const PathPool & path_pool, bool enable_ps_v3)
{
// Check whether v3 is already enabled
for (const auto & path : path_pool.listGlobalPagePaths())
{
if (PS::V3::PageStorageImpl::isManifestsFileExists(path))
{
return true;
}
}

// Check whether v3 on new node is enabled in the config, if not, no need to check anymore
if (!enable_ps_v3)
return false;

// Check whether there are any files in kvstore path, if exists, then this is not a new node.
// If it's a new node, then we enable v3. Otherwise, we use v2.
for (const auto & path : path_pool.listKVStorePaths())
{
Poco::File dir(path);
if (!dir.exists())
continue;

std::vector<std::string> files;
dir.list(files);
if (!files.empty())
{
return false;
}
}
return true;
}

bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool, bool enable_ps_v3)
{
auto lock = getLock();
if (isUsingPageStorageV3(path_pool, enable_ps_v3))
{
try
{
// create manifests file before initialize GlobalStoragePool
for (const auto & path : path_pool.listGlobalPagePaths())
PS::V3::PageStorageImpl::createManifestsFileIfNeed(path);

shared->global_storage_pool = std::make_shared<DM::GlobalStoragePool>(path_pool, *this, settings);
shared->global_storage_pool->restore();
return true;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}
else
{
shared->global_storage_pool = nullptr;
return false;
}
}

DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const
{
auto lock = getLock();
return shared->global_storage_pool;
}

UInt16 Context::getTCPPort() const
{
auto lock = getLock();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ namespace DM
{
class MinMaxIndexCache;
class DeltaIndexManager;
class GlobalStoragePool;
using GlobalStoragePoolPtr = std::shared_ptr<GlobalStoragePool>;
} // namespace DM

/// (database name, table name)
Expand Down Expand Up @@ -408,6 +410,10 @@ class Context
ReadLimiterPtr getReadLimiter() const;
IORateLimiter & getIORateLimiter() const;

bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool, bool enable_ps_v3);

DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

Compiler & getCompiler();

/// Call after initialization before using system logs. Call for global context.
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct Settings
M(SettingBool, group_by_collation_sensitive, false, "do group by with collation info.") \
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \
M(SettingInt64, batch_send_min_limit, DEFAULT_BATCH_SEND_MIN_LIMIT, "default minial chunk size of exchanging data among TiFlash.") \
M(SettingInt64, batch_send_min_limit, DEFAULT_BATCH_SEND_MIN_LIMIT, "default minimal chunk size of exchanging data among TiFlash.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, mpp_task_timeout, DEFAULT_MPP_TASK_TIMEOUT, "mpp task max endurable time.") \
M(SettingUInt64, mpp_task_running_timeout, DEFAULT_MPP_TASK_RUNNING_TIMEOUT, "mpp task max time that running without any progress.") \
Expand Down Expand Up @@ -282,7 +282,7 @@ struct Settings
M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.") \
M(SettingBool, dt_read_delta_only, false, "Only read delta data in DeltaTree Engine.") \
M(SettingBool, dt_read_stable_only, false, "Only read stable data in DeltaTree Engine.") \
M(SettingBool, dt_enable_logical_split, false, "Enable logical split or not in DeltaTree Engine.") \
M(SettingBool, dt_enable_logical_split, false, "Enable logical split or not in DeltaTree Engine.") \
M(SettingBool, dt_flush_after_write, false, "Flush cache or not after write in DeltaTree Engine.") \
M(SettingBool, dt_enable_relevant_place, false, "Enable relevant place or not in DeltaTree Engine.") \
M(SettingBool, dt_enable_skippable_place, true, "Enable skippable place or not in DeltaTree Engine.") \
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,11 @@ int benchEntry(const std::vector<std::string> & opts)
auto db_context = env.getContext();
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
*db_context,
*path_pool,
*storage_pool,
*page_id_generator,
/*hash_salt*/ 0,
0,
dm_settings.not_compress_columns,
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
raft_config.enable_compatible_mode, //
global_context->getPathCapacity(),
global_context->getFileProvider());
// must initialize before the following operation:
// 1. load data from disk(because this process may depend on the initialization of global StoragePool)
// 2. initialize KVStore service
// 1) because we need to check whether this is the first startup of this node, and we judge it based on whether there are any files in kvstore directory
// 2) KVStore service also choose its data format based on whether the GlobalStoragePool is initialized
if (global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool(), storage_config.enable_ps_v3))
LOG_FMT_INFO(log, "PageStorage V3 enabled.");

// Use pd address to define which default_database we use by default.
// For mock test, we use "default". For deployed with pd/tidb/tikv use "system", which is always exist in TiFlash.
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ void TiFlashStorageConfig::parseMisc(const String & storage_section, Poco::Logge
lazily_init_store = (*lazily_init != 0);
}

LOG_FMT_INFO(log, "format_version {} lazily_init_store {}", format_version, lazily_init_store);
// config for experimental feature, may remove later
if (auto enable_v3 = table->get_qualified_as<Int32>("enable_ps_v3"); enable_v3)
{
enable_ps_v3 = (*enable_v3 != 0);
}

LOG_FMT_INFO(log, "format_version {} lazily_init_store {} enable_ps_v3 {}", format_version, lazily_init_store, enable_ps_v3);
}

Strings TiFlashStorageConfig::getAllNormalPaths() const
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/StorageConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ struct TiFlashStorageConfig

UInt64 format_version = 0;
bool lazily_init_store = true;
bool enable_ps_v3 = false;

public:
TiFlashStorageConfig() = default;
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,11 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic
}
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
*db_context,
*path_pool,
*storage_pool,
*page_id_generator,
/*hash_salt*/ 0,
0,
dm_settings.not_compress_columns,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ ColumnTinyFilePtr ColumnFileTiny::writeColumnFile(DMContext & context, const Blo

PageId ColumnFileTiny::writeColumnFileData(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs)
{
auto page_id = context.page_id_generator.newLogPageId();
auto page_id = context.storage_pool.newLogPageId();

MemoryWriteBuffer write_buf;
PageFieldSizes col_data_sizes;
Expand Down
Loading

0 comments on commit da2a63a

Please sign in to comment.