Skip to content

Commit

Permalink
Logger: Refactor dmfile reader and add tracing_logger (#4379)
Browse files Browse the repository at this point in the history
ref #4287
  • Loading branch information
JaySon-Huang authored Mar 24, 2022
1 parent 6ab3623 commit 6301306
Show file tree
Hide file tree
Showing 31 changed files with 457 additions and 489 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ endif ()
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
# clang: warning: argument unused during compilation: '-stdlib=libc++'
# clang: warning: argument unused during compilation: '-specs=/usr/share/dpkg/no-pie-compile.specs' [-Wunused-command-line-argument]
# clang: warning: private field 'hash_salt' is not used [-Wunused-private-field]
set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wno-unused-command-line-argument -Wno-unused-private-field")
set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wno-unused-command-line-argument")
endif ()

if (ARCH_LINUX)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/CPUAffinityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,4 @@ void CPUAffinityManager::checkThreadCPUAffinity() const
}
}
#endif
} // namespace DB
} // namespace DB
7 changes: 4 additions & 3 deletions dbms/src/Common/CPUAffinityManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ class CPUAffinityManager
cpu_set_t other_cpu_set;
#endif

int query_cpu_percent;
int cpu_cores;
// unused except Linux
[[maybe_unused]] int query_cpu_percent;
[[maybe_unused]] int cpu_cores;
std::vector<std::string> query_threads;
Poco::Logger * log;

Expand All @@ -127,4 +128,4 @@ class CPUAffinityManager
CPUAffinityManager(CPUAffinityManager &&) = delete;
CPUAffinityManager & operator=(CPUAffinityManager &&) = delete;
};
} // namespace DB
} // namespace DB
17 changes: 4 additions & 13 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,22 +386,13 @@ int benchEntry(const std::vector<std::string> & opts)

auto start = high_resolution_clock::now();
{
auto stream = DB::DM::DMFileBlockInputStream(
*db_context,
std::numeric_limits<UInt64>::max(),
false,
dm_context->hash_salt,
dmfile,
*defines,
{DB::DM::RowKeyRange::newAll(false, 1)},
DB::DM::RSOperatorPtr{},
std::make_shared<DB::DM::ColumnCache>(),
DB::DM::IdSetPtr{});
auto builder = DB::DM::DMFileBlockInputStreamBuilder(*db_context);
auto stream = builder.setColumnCache(std::make_shared<DB::DM::ColumnCache>()).build(dmfile, *defines, {DB::DM::RowKeyRange::newAll(false, 1)});
for (size_t j = 0; j < blocks.size(); ++j)
{
TIFLASH_NO_OPTIMIZE(stream.read());
TIFLASH_NO_OPTIMIZE(stream->read());
}
stream.readSuffix();
stream->readSuffix();
}
auto end = high_resolution_clock::now();
auto duration = duration_cast<nanoseconds>(end - start).count();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/DTTool/DTToolInspect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args)

// black_hole is used to consume data manually.
// we use SCOPE_EXIT to ensure the release of memory area.
auto black_hole = reinterpret_cast<char *>(::operator new (DBMS_DEFAULT_BUFFER_SIZE, std::align_val_t{64}));
auto * black_hole = reinterpret_cast<char *>(::operator new (DBMS_DEFAULT_BUFFER_SIZE, std::align_val_t{64}));
SCOPE_EXIT({ ::operator delete (black_hole, std::align_val_t{64}); });
auto consume = [&](DB::ReadBuffer & t) {
while (t.readBig(black_hole, DBMS_DEFAULT_BUFFER_SIZE) != 0) {}
Expand Down
24 changes: 11 additions & 13 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ ColumnFileBig::ColumnFileBig(const DMContext & context, const DMFilePtr & file_,
void ColumnFileBig::calculateStat(const DMContext & context)
{
auto index_cache = context.db_context.getGlobalContext().getMinMaxIndexCache();
auto hash_salt = context.hash_salt;

auto pack_filter
= DMFilePackFilter::loadFrom(file, index_cache, hash_salt, {segment_range}, EMPTY_FILTER, {}, context.db_context.getFileProvider(), context.getReadLimiter());
auto pack_filter = DMFilePackFilter::loadFrom(
file,
index_cache,
{segment_range},
EMPTY_FILTER,
{},
context.db_context.getFileProvider(),
context.getReadLimiter(),
/*tracing_logger*/ nullptr);

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}
Expand Down Expand Up @@ -79,16 +85,8 @@ void ColumnFileBigReader::initStream()
if (file_stream)
return;

file_stream = std::make_shared<DMFileBlockInputStream>(context.db_context,
/*max_version*/ MAX_UINT64,
/*clean_read*/ false,
context.hash_salt,
column_file.getFile(),
*col_defs,
RowKeyRanges{column_file.segment_range},
RSOperatorPtr{},
ColumnCachePtr{},
IdSetPtr{});
DMFileBlockInputStreamBuilder builder(context.db_context);
file_stream = builder.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range});

// If we only need to read pk and version columns, then cache columns data in memory.
if (pk_ver_only)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
}

// Let's calculate gc_hint_version
gc_hint_version = UINT64_MAX;
gc_hint_version = std::numeric_limits<UInt64>::max();
{
UInt8 * filter_pos = filter.data();
size_t handle_pos = 0;
Expand Down Expand Up @@ -380,7 +380,7 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
else
{
Block res;
for (auto & c : header)
for (const auto & c : header)
{
auto & column = cur_raw_block.getByName(c.name);
column.column = column.column->filter(filter, passed_count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
}
}

return matched ? cur_version : UINT64_MAX;
return matched ? cur_version : std::numeric_limits<UInt64>::max();
}

private:
Expand Down
13 changes: 2 additions & 11 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,8 @@ inline const ColumnDefine & getExtraTableIDColumnDefine()
return EXTRA_TABLE_ID_COLUMN_DEFINE_;
}

static constexpr UInt64 MIN_UINT64 = std::numeric_limits<UInt64>::min();
static constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();

static constexpr Int64 MIN_INT64 = std::numeric_limits<Int64>::min();
static constexpr Int64 MAX_INT64 = std::numeric_limits<Int64>::max();

static constexpr Handle N_INF_HANDLE = MIN_INT64; // Used in range, indicating negative infinity.
static constexpr Handle P_INF_HANDLE = MAX_INT64; // Used in range, indicating positive infinity.

static_assert(static_cast<Int64>(static_cast<UInt64>(MIN_INT64)) == MIN_INT64, "Unsupported compiler!");
static_assert(static_cast<Int64>(static_cast<UInt64>(MAX_INT64)) == MAX_INT64, "Unsupported compiler!");
static_assert(static_cast<Int64>(static_cast<UInt64>(std::numeric_limits<Int64>::min())) == std::numeric_limits<Int64>::min(), "Unsupported compiler!");
static_assert(static_cast<Int64>(static_cast<UInt64>(std::numeric_limits<Int64>::max())) == std::numeric_limits<Int64>::max(), "Unsupported compiler!");

static constexpr bool DM_RUN_CHECK = true;

Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFilePackFilter.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/SchemaUpdate.h>
#include <Storages/DeltaMerge/Segment.h>
Expand Down Expand Up @@ -1021,7 +1020,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
after_segment_read,
columns_to_read,
EMPTY_FILTER,
MAX_UINT64,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
true,
db_settings.dt_raw_filter_range,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ class DMFile : private boost::noncopyable
friend class DMFileWriter;
friend class DMFileReader;
friend class DMFilePackFilter;
friend class DMFileBlockInputStreamBuilder;
friend int ::DTTool::Migrate::migrateServiceMain(DB::Context & context, const ::DTTool::Migrate::MigrateArgs & args);
friend bool ::DTTool::Migrate::isRecognizable(const DB::DM::DMFile & file, const std::string & target);
friend bool ::DTTool::Migrate::needFrameMigration(const DB::DM::DMFile & file, const std::string & target);
Expand Down
65 changes: 65 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>

namespace DB::DM
{

DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context)
: file_provider(context.getFileProvider())
, read_limiter(context.getReadLimiter())
{
// init from global context
const auto & global_context = context.getGlobalContext();
setCaches(global_context.getMarkCache(), global_context.getMinMaxIndexCache());
// init from settings
setFromSettings(context.getSettingsRef());
}

DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & dmfile, const ColumnDefines & read_columns, const RowKeyRanges & rowkey_ranges)
{
if (dmfile->getStatus() != DMFile::Status::READABLE)
throw Exception(fmt::format(
"DMFile [{}] is expected to be in READABLE status, but: {}",
dmfile->fileId(),
DMFile::statusString(dmfile->getStatus())),
ErrorCodes::LOGICAL_ERROR);

// if `rowkey_ranges` is empty, we unconditionally read all packs
// `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode.
// It is safe to ignore them here.
if (unlikely(rowkey_ranges.empty() && enable_clean_read))
throw Exception("rowkey ranges shouldn't be empty with clean-read enabled", ErrorCodes::LOGICAL_ERROR);

bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;

DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
index_cache,
rowkey_ranges,
rs_filter,
read_packs,
file_provider,
read_limiter,
tracing_logger);

DMFileReader reader(
dmfile,
read_columns,
is_common_handle,
enable_clean_read,
max_data_version,
std::move(pack_filter),
mark_cache,
enable_column_cache,
column_cache,
aio_threshold,
max_read_buffer_size,
file_provider,
read_limiter,
rows_threshold_per_read,
read_one_pack_every_time,
tracing_logger);

return std::make_shared<DMFileBlockInputStream>(std::move(reader));
}
} // namespace DB::DM
Loading

0 comments on commit 6301306

Please sign in to comment.