Skip to content

Commit

Permalink
Improve LearnerRead log (#3690)
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise authored Dec 20, 2021
1 parent 4de3b23 commit 275f968
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 66 deletions.
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead()
if (info_retry)
throw RegionException({info_retry->begin()->get().region_id}, status);

return doLearnerRead(table_id, *mvcc_query_info, max_streams, /*wait_index_timeout_as_region_not_found*/ true, tmt, log->getLog());
return doLearnerRead(table_id, *mvcc_query_info, max_streams, /*wait_index_timeout_as_region_not_found*/ true, tmt, log);
}

/// Will assign region_retry
Expand Down Expand Up @@ -209,7 +209,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead()
}
if (mvcc_query_info->regions_query_info.empty())
return {};
return doLearnerRead(table_id, *mvcc_query_info, max_streams, /*wait_index_timeout_as_region_not_found*/ false, tmt, log->getLog());
return doLearnerRead(table_id, *mvcc_query_info, max_streams, /*wait_index_timeout_as_region_not_found*/ false, tmt, log);
}
catch (const LockException & e)
{
Expand Down Expand Up @@ -279,7 +279,7 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block
region_ids.insert(info.region_id);
throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND);
});
validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log->getLog());
validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log);
break;
}
catch (RegionException & e)
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Columns/Collator.h>
#include <Common/LogWithPrefix.h>
#include <Common/TiFlashException.h>
#include <Common/typeid_cast.h>
#include <Core/Field.h>
Expand Down Expand Up @@ -93,7 +94,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
, subquery_depth(subquery_depth_)
, only_analyze(only_analyze)
, input(input)
, log(&Poco::Logger::get("InterpreterSelectQuery"))
, log(getLogWithPrefix(nullptr, "InterpreterSelectQuery"))
{
init(required_result_column_names_);
}
Expand All @@ -106,7 +107,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(OnlyAnalyzeTag, const ASTPtr & qu
, to_stage(QueryProcessingStage::Complete)
, subquery_depth(0)
, only_analyze(true)
, log(&Poco::Logger::get("InterpreterSelectQuery"))
, log(getLogWithPrefix(nullptr, "InterpreterSelectQuery"))
{
init({});
}
Expand Down
32 changes: 17 additions & 15 deletions dbms/src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
#pragma once

#include <Core/QueryProcessingStage.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreter.h>
#include <Storages/Transaction/Types.h>


namespace Poco { class Logger; }
#include <memory>

namespace DB
{

class ExpressionAnalyzer;
class ASTSelectQuery;
struct SubqueryForSet;

class LogWithPrefix;
using LogWithPrefixPtr = std::shared_ptr<LogWithPrefix>;

/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
*/
Expand Down Expand Up @@ -104,7 +104,9 @@ class InterpreterSelectQuery : public IInterpreter
}
};

struct OnlyAnalyzeTag {};
struct OnlyAnalyzeTag
{
};
InterpreterSelectQuery(
OnlyAnalyzeTag,
const ASTPtr & query_ptr_,
Expand All @@ -119,14 +121,14 @@ class InterpreterSelectQuery : public IInterpreter

struct AnalysisResult
{
bool has_join = false;
bool has_where = false;
bool has_join = false;
bool has_where = false;
bool need_aggregate = false;
bool has_having = false;
bool has_order_by = false;
bool has_limit_by = false;
bool has_having = false;
bool has_order_by = false;
bool has_limit_by = false;

ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
Expand Down Expand Up @@ -205,7 +207,7 @@ class InterpreterSelectQuery : public IInterpreter
/// Used when we read from prepared input, not table or subquery.
BlockInputStreamPtr input;

Poco::Logger * log;
LogWithPrefixPtr log;
};

}
} // namespace DB
96 changes: 52 additions & 44 deletions dbms/src/Storages/Transaction/LearnerRead.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <Common/LogWithPrefix.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
Expand All @@ -12,6 +13,7 @@
#include <Storages/Transaction/Utils.h>
#include <common/ThreadPool.h>
#include <common/likely.h>
#include <common/logger_useful.h>
#include <fmt/format.h>

#include <ext/scope_guard.h>
Expand Down Expand Up @@ -144,7 +146,7 @@ LearnerReadSnapshot doLearnerRead(
size_t num_streams,
bool wait_index_timeout_as_region_not_found,
TMTContext & tmt,
Poco::Logger * log)
const LogWithPrefixPtr & log)
{
assert(log != nullptr);

Expand All @@ -165,7 +167,7 @@ LearnerReadSnapshot doLearnerRead(
auto region = kvstore->getRegion(info.region_id);
if (region == nullptr)
{
LOG_WARNING(log, "[region " << info.region_id << "] is not found in KVStore, try again");
LOG_FMT_WARNING(log, "[region {}] is not found in KVStore, try again", info.region_id);
throw RegionException({info.region_id}, RegionException::RegionReadStatus::NOT_FOUND);
}
regions_snapshot.emplace(info.region_id, std::move(region));
Expand Down Expand Up @@ -254,16 +256,15 @@ LearnerReadSnapshot doLearnerRead(
auto read_index_elapsed_ms = watch.elapsedMilliseconds();
GET_METRIC(tiflash_raft_read_index_duration_seconds).Observe(read_index_elapsed_ms / 1000.0);
const size_t cached_size = ori_batch_region_size - batch_read_index_req.size();
LOG_DEBUG(

LOG_FMT_DEBUG(
log,
"Batch read index, original size " << ori_batch_region_size << ", send & get " << batch_read_index_req.size()
<< " message, cost " << read_index_elapsed_ms << "ms";
do {
if (cached_size)
{
oss_internal_rare << ", " << std::to_string(cached_size) << " in cache";
}
} while (0));
"Batch read index, original size {}, send & get {} message, cost {}ms{}",
ori_batch_region_size,
batch_read_index_req.size(),
read_index_elapsed_ms,
(cached_size != 0) ? (fmt::format(", {} in cache", cached_size)) : "");

watch.restart(); // restart to count the elapsed of wait index
}

Expand Down Expand Up @@ -350,33 +351,36 @@ LearnerReadSnapshot doLearnerRead(
region_to_query.bypass_lock_ts,
region_to_query.version,
region_to_query.conf_version,
log);

std::visit(variant_op::overloaded{
[&](LockInfoPtr & lock) { unavailable_regions.setRegionLock(region->id(), std::move(lock)); },
[&](RegionException::RegionReadStatus & status) {
if (status != RegionException::RegionReadStatus::OK)
{
LOG_WARNING(log,
"Check memory cache, region "
<< region_to_query.region_id << ", version " << region_to_query.version << ", handle range "
<< RecordKVFormat::DecodedTiKVKeyRangeToDebugString(region_to_query.range_in_table)
<< ", status " << RegionException::RegionReadStatusString(status));
unavailable_regions.add(region->id(), status);
}
},
},
res);
log->getLog());

std::visit(
variant_op::overloaded{
[&](LockInfoPtr & lock) { unavailable_regions.setRegionLock(region->id(), std::move(lock)); },
[&](RegionException::RegionReadStatus & status) {
if (status != RegionException::RegionReadStatus::OK)
{
LOG_FMT_WARNING(
log,
"Check memory cache, region {}, version {}, handle range {}, status {}",
region_to_query.region_id,
region_to_query.version,
RecordKVFormat::DecodedTiKVKeyRangeToDebugString(region_to_query.range_in_table),
RegionException::RegionReadStatusString(status));
unavailable_regions.add(region->id(), status);
}
},
},
res);
}
}
GET_METRIC(tiflash_syncing_data_freshness).Observe(batch_wait_data_watch.elapsedSeconds()); // For DBaaS SLI
auto wait_index_elapsed_ms = watch.elapsedMilliseconds();
LOG_DEBUG(log,
fmt::format(
"Finish wait index | resolve locks | check memory cache for {} regions, cost {}ms, {} unavailable regions",
batch_read_index_req.size(),
wait_index_elapsed_ms,
unavailable_regions.size()));
LOG_FMT_DEBUG(
log,
"Finish wait index | resolve locks | check memory cache for {} regions, cost {}ms, {} unavailable regions",
batch_read_index_req.size(),
wait_index_elapsed_ms,
unavailable_regions.size());
};

auto start_time = Clock::now();
Expand All @@ -399,10 +403,12 @@ LearnerReadSnapshot doLearnerRead(
unavailable_regions.tryThrowRegionException(regions_info);

auto end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] batch read index | wait index cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count()
<< " ms totally, regions_num=" << num_regions << ", concurrency=" << concurrent_num);
LOG_FMT_DEBUG(
log,
"[Learner Read] batch read index | wait index cost {} ms totally, regions_num={}, concurrency={}",
std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(),
num_regions,
concurrent_num);

return regions_snapshot;
}
Expand All @@ -412,7 +418,7 @@ void validateQueryInfo(
const MvccQueryInfo & mvcc_query_info,
const LearnerReadSnapshot & regions_snapshot,
TMTContext & tmt,
Poco::Logger * log)
const LogWithPrefixPtr & log)
{
RegionException::UnavailableRegions fail_region_ids;
RegionException::RegionReadStatus fail_status = RegionException::RegionReadStatus::OK;
Expand All @@ -437,11 +443,13 @@ void validateQueryInfo(
{
fail_region_ids.emplace(region_query_info.region_id);
fail_status = status;
LOG_WARNING(log,
"Check after read from Storage, region "
<< region_query_info.region_id << ", version " << region_query_info.version //
<< ", handle range " << RecordKVFormat::DecodedTiKVKeyRangeToDebugString(region_query_info.range_in_table)
<< ", status " << RegionException::RegionReadStatusString(status));
LOG_FMT_WARNING(
log,
"Check after read from Storage, region {}, version {}, handle range {}, status {}",
region_query_info.region_id,
region_query_info.version,
RecordKVFormat::DecodedTiKVKeyRangeToDebugString(region_query_info.range_in_table),
RegionException::RegionReadStatusString(status));
}
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/LearnerRead.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ doLearnerRead(
size_t num_streams,
bool wait_index_timeout_as_region_not_found,
TMTContext & tmt,
Poco::Logger * log);
const LogWithPrefixPtr & log);

// After getting stream from storage, we must make sure regions' version haven't changed after learner read.
// If some regions' version changed, this function will throw `RegionException`.
void validateQueryInfo(
const MvccQueryInfo & mvcc_query_info,
const LearnerReadSnapshot & regions_snapshot,
TMTContext & tmt,
Poco::Logger * log);
const LogWithPrefixPtr & log);

} // namespace DB

0 comments on commit 275f968

Please sign in to comment.