Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing: Add read_tso as tracing id to PageStorage snapshot #4288

Merged
merged 10 commits into from
Apr 8, 2022
28 changes: 27 additions & 1 deletion dbms/src/Common/Logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,20 @@ class Logger : private boost::noncopyable
return getInternal(source, buf, std::forward<T>(first_identifier), std::forward<Args>(rest)...);
}

template <typename T, typename... Args>
static LoggerPtr get(Poco::Logger * source_log, T && first_identifier, Args &&... rest)
{
FmtBuffer buf;
return getInternal(source_log, buf, std::forward<T>(first_identifier), std::forward<Args>(rest)...);
}

Logger(const std::string & source, const std::string & identifier)
: logger(&Poco::Logger::get(source))
: Logger(&Poco::Logger::get(source), identifier)
{
}

Logger(Poco::Logger * source_log, const std::string & identifier)
: logger(source_log)
, id(identifier)
{
}
Expand Down Expand Up @@ -114,6 +126,20 @@ class Logger : private boost::noncopyable
return std::make_shared<Logger>(source, buf.toString());
}

template <typename T, typename... Args>
static LoggerPtr getInternal(Poco::Logger * source_log, FmtBuffer & buf, T && first, Args &&... args)
{
buf.fmtAppend("{} ", std::forward<T>(first));
return getInternal(source_log, buf, std::forward<Args>(args)...);
}

template <typename T>
static LoggerPtr getInternal(Poco::Logger * source_log, FmtBuffer & buf, T && identifier)
{
buf.fmtAppend("{}", std::forward<T>(identifier));
return std::make_shared<Logger>(source_log, buf.toString());
}

std::string wrapMsg(const std::string & msg) const
{
return fmt::format("{} {}", id, msg);
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead()
std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSelectQueryInfos()
{
std::unordered_map<TableID, SelectQueryInfo> ret;
auto create_query_info = [&]() -> SelectQueryInfo {
auto create_query_info = [&](Int64 table_id) -> SelectQueryInfo {
SelectQueryInfo query_info;
/// to avoid null point exception
query_info.query = makeDummyQuery();
Expand All @@ -270,13 +270,14 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
query_info.req_id = fmt::format("{} Table<{}>", log->identifier(), table_id);
return query_info;
};
if (table_scan.isPartitionTableScan())
{
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
{
SelectQueryInfo query_info = create_query_info();
SelectQueryInfo query_info = create_query_info(physical_table_id);
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(mvcc_query_info->resolve_locks, mvcc_query_info->read_tso);
ret.emplace(physical_table_id, std::move(query_info));
}
Expand All @@ -292,8 +293,8 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
}
else
{
TableID table_id = logical_table_id;
SelectQueryInfo query_info = create_query_info();
const TableID table_id = logical_table_id;
SelectQueryInfo query_info = create_query_info(table_id);
query_info.mvcc_query_info = std::move(mvcc_query_info);
ret.emplace(table_id, std::move(query_info));
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Checksum.h>
#include <Common/Logger.h>
#include <Server/DTTool/DTTool.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
Expand Down Expand Up @@ -287,4 +288,4 @@ TEST_F(DTToolTest, BlockwiseInvariant)
EXPECT_EQ(new_prop_iter, refreshed_file->getPackProperties().property().end());
stream->readSuffix();
}
}
}
6 changes: 4 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void ColumnFileBig::calculateStat(const DMContext & context)
{},
context.db_context.getFileProvider(),
context.getReadLimiter(),
/*tracing_logger*/ nullptr);
/*tracing_id*/ context.tracing_id);

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}
Expand Down Expand Up @@ -87,7 +87,9 @@ void ColumnFileBigReader::initStream()
return;

DMFileBlockInputStreamBuilder builder(context.db_context);
file_stream = builder.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range});
file_stream = builder
.setTracingID(context.tracing_id)
.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range});

// If we only need to read pk and version columns, then cache columns data in memory.
if (pk_ver_only)
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/Logger.h>
#include <Core/Types.h>
#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
Expand All @@ -24,6 +25,7 @@ namespace DB
{
class StoragePathPool;


namespace DM
{
class StoragePool;
Expand Down Expand Up @@ -81,7 +83,7 @@ struct DMContext : private boost::noncopyable
const bool enable_relevant_place;
const bool enable_skippable_place;

const String query_id;
String tracing_id;

public:
DMContext(const Context & db_context_,
Expand All @@ -93,7 +95,7 @@ struct DMContext : private boost::noncopyable
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
const String & query_id_ = "")
const String & tracing_id_ = "")
: db_context(db_context_)
, path_pool(path_pool_)
, storage_pool(storage_pool_)
Expand All @@ -117,7 +119,7 @@ struct DMContext : private boost::noncopyable
, read_stable_only(settings.dt_read_stable_only)
, enable_relevant_place(settings.dt_enable_relevant_place)
, enable_skippable_place(settings.dt_enable_skippable_place)
, query_id(query_id_)
, tracing_id(tracing_id_)
{
}

Expand Down
22 changes: 14 additions & 8 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#pragma once

#include <Columns/ColumnsCommon.h>
#include <Common/Exception.h>
#include <Common/Logger.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
Expand All @@ -36,17 +38,20 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
{
static_assert(MODE == DM_VERSION_FILTER_MODE_MVCC || MODE == DM_VERSION_FILTER_MODE_COMPACT);

constexpr static const char * MVCC_FILTER_NAME = "DMVersionFilterBlockInputStream<MVCC>";
constexpr static const char * COMPACT_FILTER_NAME = "DMVersionFilterBlockInputStream<COMPACT>";

public:
DMVersionFilterBlockInputStream(const BlockInputStreamPtr & input,
const ColumnDefines & read_columns,
UInt64 version_limit_,
bool is_common_handle_,
const String & query_id_ = "")
const String & tracing_id = "")
: version_limit(version_limit_)
, is_common_handle(is_common_handle_)
, header(toEmptyBlock(read_columns))
, query_id(query_id_)
, log(&Poco::Logger::get("DMVersionFilterBlockInputStream<" + String(MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT") + ">"))
, log(Logger::get((MODE == DM_VERSION_FILTER_MODE_MVCC ? MVCC_FILTER_NAME : COMPACT_FILTER_NAME),
tracing_id))
{
children.push_back(input);

Expand All @@ -60,15 +65,17 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
~DMVersionFilterBlockInputStream()
{
LOG_FMT_DEBUG(log,
"Total rows: {}, pass: {:.2f}%, complete pass: {:.2f}%, complete not pass: {:.2f}%, not clean: {:.2f}%, effective: {:.2f}%, read tso: {}, query id: {}",
"Total rows: {}, pass: {:.2f}%"
", complete pass: {:.2f}%, complete not pass: {:.2f}%"
", not clean: {:.2f}%, effective: {:.2f}%"
", read tso: {}",
total_rows,
passed_rows * 100.0 / total_rows,
complete_passed * 100.0 / total_blocks,
complete_not_passed * 100.0 / total_blocks,
not_clean_rows * 100.0 / passed_rows,
effective_num_rows * 100.0 / passed_rows,
version_limit,
(query_id.empty() ? "<non-query>" : query_id));
version_limit);
}

void readPrefix() override;
Expand Down Expand Up @@ -192,7 +199,6 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
const UInt64 version_limit;
const bool is_common_handle;
const Block header;
const String query_id;

size_t handle_col_pos;
size_t version_col_pos;
Expand Down Expand Up @@ -230,7 +236,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
size_t not_clean_rows = 0;
size_t effective_num_rows = 0;

Poco::Logger * const log;
const LoggerPtr log;
};
} // namespace DM
} // namespace DB
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
snap->is_update = for_update;
snap->_delta = this->shared_from_this();

// TODO: Add tracing_id from mpp task or background tasks
auto storage_snap = std::make_shared<StorageSnapshot>(context.storage_pool, context.getReadLimiter(), /*tracing_id*/ "", true);
auto storage_snap = std::make_shared<StorageSnapshot>(context.storage_pool, context.getReadLimiter(), context.tracing_id, /*snapshot_read*/ true);
snap->persisted_files_snap = persisted_file_set->createSnapshot(storage_snap);
snap->shared_delta_index = delta_index;

Expand Down
Loading