Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine some comments about learner read #4784

Merged
merged 9 commits into from
May 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_slow_page_storage_snapshot_release)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
Expand Down
85 changes: 58 additions & 27 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/TiFlashException.h>
#include <Common/TiFlashMetrics.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Flash/Coprocessor/DAGStorageInterpreter.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Interpreters/Context.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/IManageableStorage.h>
#include <Storages/MutableSupport.h>
Expand All @@ -37,7 +41,7 @@ namespace FailPoints
{
extern const char region_exception_after_read_from_storage_some_error[];
extern const char region_exception_after_read_from_storage_all_error[];
extern const char pause_after_learner_read[];
extern const char pause_with_alter_locks_acquired[];
extern const char force_remote_read_for_batch_cop[];
extern const char pause_after_copr_streams_acquired[];
} // namespace FailPoints
Expand Down Expand Up @@ -223,6 +227,9 @@ DAGStorageInterpreter::DAGStorageInterpreter(
}
}

// Apply learner read to ensure we can get strong consistent with TiKV Region
// leaders. If the local Regions do not match the requested Regions, then build
// request to retry fetching data from other nodes.
void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
{
prepare();
Expand All @@ -233,13 +240,19 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
{
if (!mvcc_query_info->regions_query_info.empty())
doLocalRead(pipeline, settings.max_block_size);
buildLocalRead(pipeline, settings.max_block_size);

null_stream_if_empty = std::make_shared<NullBlockInputStream>(storage_for_logical_table->getSampleBlockForColumns(required_columns));
// Should build `remote_requests` and `null_stream` under protect of `table_structure_lock`.
auto null_stream_if_empty = std::make_shared<NullBlockInputStream>(storage_for_logical_table->getSampleBlockForColumns(required_columns));

// Should build these vars under protect of `table_structure_lock`.
buildRemoteRequests();
auto remote_requests = buildRemoteRequests();

// A failpoint to test pause before alter lock released
FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired);
// Release alter locks
// The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result
// from those streams even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
releaseAlterLocks();
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

// It is impossible to have no joined stream.
Expand All @@ -249,15 +262,15 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

// For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop / mpp mode.
if (!remote_requests.empty())
executeRemoteQuery(pipeline);
executeRemoteQuery(std::move(remote_requests), pipeline);

/// record local and remote io input stream
auto & table_scan_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[table_scan.getTableScanExecutorID()];
pipeline.transform([&](auto & stream) { table_scan_io_input_streams.push_back(stream); });

if (pipeline.streams.empty())
{
pipeline.streams.emplace_back(null_stream_if_empty);
pipeline.streams.emplace_back(std::move(null_stream_if_empty));
// reset remote_read_streams_start_index for null_stream_if_empty.
remote_read_streams_start_index = 1;
}
Expand Down Expand Up @@ -290,21 +303,36 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

void DAGStorageInterpreter::prepare()
{
// About why we do learner read before acquiring structure lock on Storage(s).
// Assume that:
// 1. Read threads do learner read and wait for the Raft applied index with holding a read lock
// on "alter lock" of an IStorage X
// 2. Raft threads try to decode data for Region in the same IStorage X, and find it need to
// apply DDL operations which acquire write lock on "alter locks"
// Under this situation, all Raft threads will be stuck by the read threads, but read threads
// wait for Raft threads to push forward the applied index. Deadlocks happens!!
// So we must do learner read without structure lock on IStorage. After learner read, acquire the
// structure lock of IStorage(s) (to avoid concurrent issues between read threads and DDL
// operations) and build the requested inputstreams. Once the inputstreams build, we should release
// the alter lock to avoid blocking DDL operations.
// TODO: If we can acquire a read-only view on the IStorage structure (both `ITableDeclaration`
// and `TiDB::TableInfo`) we may get this process more simplified. (tiflash/issues/1853)

// Do learner read
const DAGContext & dag_context = *context.getDAGContext();
if (dag_context.isBatchCop() || dag_context.isMPPTask())
learner_read_snapshot = doBatchCopLearnerRead();
else
learner_read_snapshot = doCopLearnerRead();

// Acquire read lock on `alter lock` and build the requested inputstreams
storages_with_structure_lock = getAndLockStorages(settings.schema_version);
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(settings.max_columns_to_read);

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);

FAIL_POINT_PAUSE(FailPoints::pause_after_learner_read);
}

void DAGStorageInterpreter::executePushedDownFilter(
Expand Down Expand Up @@ -392,7 +420,7 @@ void DAGStorageInterpreter::executeCastAfterTableScan(
}
}

void DAGStorageInterpreter::executeRemoteQuery(DAGPipeline & pipeline)
void DAGStorageInterpreter::executeRemoteQuery(std::vector<RemoteRequest> && remote_requests, DAGPipeline & pipeline)
{
assert(!remote_requests.empty());
DAGSchema & schema = remote_requests[0].schema;
Expand Down Expand Up @@ -464,8 +492,9 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead()
{
if (table_scan.isPartitionTableScan())
{
throw Exception("Cop request does not support partition table scan");
throw TiFlashException("Cop request does not support partition table scan", DB::Errors::Coprocessor::BadRequest);
}

TablesRegionInfoMap regions_for_local_read;
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
{
Expand All @@ -481,7 +510,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead()
if (info_retry)
throw RegionException({info_retry->begin()->get().region_id}, status);

return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, false, context, log);
return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/false, context, log);
}

/// Will assign region_retry_from_local_region
Expand Down Expand Up @@ -517,7 +546,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead()
}
if (mvcc_query_info->regions_query_info.empty())
return {};
return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, true, context, log);
return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/true, context, log);
}
catch (const LockException & e)
{
Expand Down Expand Up @@ -584,18 +613,18 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
return ret;
}

void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block_size)
void DAGStorageInterpreter::buildLocalRead(DAGPipeline & pipeline, size_t max_block_size)
{
const DAGContext & dag_context = *context.getDAGContext();
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
auto table_query_infos = generateSelectQueryInfos();
for (auto & table_query_info : table_query_infos)
const auto table_query_infos = generateSelectQueryInfos();
for (const auto & table_query_info : table_query_infos)
{
DAGPipeline current_pipeline;
TableID table_id = table_query_info.first;
SelectQueryInfo & query_info = table_query_info.second;
const TableID table_id = table_query_info.first;
const SelectQueryInfo & query_info = table_query_info.second;
size_t region_num = query_info.mvcc_query_info->regions_query_info.size();
if (region_num == 0)
continue;
Expand All @@ -613,11 +642,11 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block
{
current_pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, current_max_streams);

// After getting streams from storage, we need to validate whether regions have changed or not after learner read.
// In case the versions of regions have changed, those `streams` may contain different data other than expected.
// Like after region merge/split.
// After getting streams from storage, we need to validate whether Regions have changed or not after learner read.
// (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams`
// may contain different data other than expected.

// Inject failpoint to throw RegionException
// Inject failpoint to throw RegionException for testing
fiu_do_on(FailPoints::region_exception_after_read_from_storage_some_error, {
const auto & regions_info = query_info.mvcc_query_info->regions_query_info;
RegionException::UnavailableRegions region_ids;
Expand Down Expand Up @@ -781,7 +810,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return {{}, {}, {}, false};
}

if (table_store->engineType() != ::TiDB::StorageEngine::TMT && table_store->engineType() != ::TiDB::StorageEngine::DT)
if (unlikely(table_store->engineType() != ::TiDB::StorageEngine::DT))
{
throw TiFlashException(
fmt::format(
Expand Down Expand Up @@ -954,8 +983,10 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
return {required_columns_tmp, source_columns_tmp, need_cast_column};
}

void DAGStorageInterpreter::buildRemoteRequests()
// Build remote requests from `region_retry_from_local_region`
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
std::vector<RemoteRequest> DAGStorageInterpreter::buildRemoteRequests()
{
std::vector<RemoteRequest> remote_requests;
std::unordered_map<Int64, Int64> region_id_to_table_id_map;
std::unordered_map<Int64, RegionRetryList> retry_regions_map;
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
Expand All @@ -978,6 +1009,8 @@ void DAGStorageInterpreter::buildRemoteRequests()
if (retry_regions.empty())
continue;

// Append the region into DAGContext to return them to the upper layer.
// The upper layer should refresh its cache about these regions.
for (const auto & r : retry_regions)
context.getDAGContext()->retry_regions.push_back(r.get());

Expand All @@ -989,13 +1022,11 @@ void DAGStorageInterpreter::buildRemoteRequests()
push_down_filter,
log));
}
return remote_requests;
}

void DAGStorageInterpreter::releaseAlterLocks()
{
// The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result
// from those streams even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
for (auto storage_with_lock : storages_with_structure_lock)
{
drop_locks.emplace_back(std::get<1>(std::move(storage_with_lock.second.lock).release()));
Expand Down
14 changes: 5 additions & 9 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@

#pragma once

#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/Context.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableLockHolder.h>
Expand All @@ -43,7 +41,7 @@ namespace DB
using TablesRegionInfoMap = std::unordered_map<Int64, std::reference_wrapper<const RegionInfoMap>>;
/// DAGStorageInterpreter encapsulates operations around storage during interprete stage.
/// It's only intended to be used by DAGQueryBlockInterpreter.
/// After DAGStorageInterpreter::execute some of its members will be transfered to DAGQueryBlockInterpreter.
/// After DAGStorageInterpreter::execute some of its members will be transferred to DAGQueryBlockInterpreter.
class DAGStorageInterpreter
{
public:
Expand All @@ -58,7 +56,7 @@ class DAGStorageInterpreter

void execute(DAGPipeline & pipeline);

/// Members will be transfered to DAGQueryBlockInterpreter after execute
/// Members will be transferred to DAGQueryBlockInterpreter after execute

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

Expand All @@ -72,13 +70,13 @@ class DAGStorageInterpreter

LearnerReadSnapshot doBatchCopLearnerRead();

void doLocalRead(DAGPipeline & pipeline, size_t max_block_size);
void buildLocalRead(DAGPipeline & pipeline, size_t max_block_size);

std::unordered_map<TableID, StorageWithStructureLock> getAndLockStorages(Int64 query_schema_version);

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan(Int64 max_columns_to_read);

void buildRemoteRequests();
std::vector<RemoteRequest> buildRemoteRequests();

void releaseAlterLocks();

Expand All @@ -88,7 +86,7 @@ class DAGStorageInterpreter

void recordProfileStreams(DAGPipeline & pipeline, const String & key);

void executeRemoteQuery(DAGPipeline & pipeline);
void executeRemoteQuery(std::vector<RemoteRequest> && remote_requests, DAGPipeline & pipeline);

void executeCastAfterTableScan(
size_t remote_read_streams_start_index,
Expand All @@ -107,8 +105,6 @@ class DAGStorageInterpreter
/// it shouldn't be hash map because duplicated region id may occur if merge regions to retry of dag.
RegionRetryList region_retry_from_local_region;
TableLockHolders drop_locks;
std::vector<RemoteRequest> remote_requests;
BlockInputStreamPtr null_stream_if_empty;

/// passed from caller, doesn't change during DAGStorageInterpreter's lifetime

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TiDBTableScan
{
return physical_table_ids;
}
String getTableScanExecutorID() const
const String & getTableScanExecutorID() const
{
return executor_id;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Functions/GeoUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#endif

#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fix compile error under MacOS with Apple clang version 13.0.0

#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic ignored "-Wunused-variable"
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/LearnerRead.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct RegionLearnerReadSnapshot : RegionPtr
UInt64 snapshot_event_flag{0};

RegionLearnerReadSnapshot() = default;
RegionLearnerReadSnapshot(const RegionPtr & region)
explicit RegionLearnerReadSnapshot(const RegionPtr & region)
: RegionPtr(region)
, snapshot_event_flag(region->getSnapshotEventFlag())
{}
Expand Down