Skip to content

Commit

Permalink
Refine some comments about learner read (#4784)
Browse files Browse the repository at this point in the history
ref #4118
  • Loading branch information
JaySon-Huang authored May 7, 2022
1 parent 097f93d commit 5d461ab
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 51 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_slow_page_storage_snapshot_release)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
Expand Down
102 changes: 71 additions & 31 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,41 @@

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/TiFlashException.h>
#include <Common/TiFlashMetrics.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Flash/Coprocessor/DAGStorageInterpreter.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Interpreters/Context.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/IManageableStorage.h>
#include <Storages/MutableSupport.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/SchemaSyncer.h>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <kvproto/coprocessor.pb.h>
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop


namespace DB
{
namespace FailPoints
{
extern const char region_exception_after_read_from_storage_some_error[];
extern const char region_exception_after_read_from_storage_all_error[];
extern const char pause_after_learner_read[];
extern const char pause_with_alter_locks_acquired[];
extern const char force_remote_read_for_batch_cop[];
extern const char pause_after_copr_streams_acquired[];
} // namespace FailPoints
Expand Down Expand Up @@ -223,6 +234,9 @@ DAGStorageInterpreter::DAGStorageInterpreter(
}
}

// Apply learner read to ensure we can get strong consistent with TiKV Region
// leaders. If the local Regions do not match the requested Regions, then build
// request to retry fetching data from other nodes.
void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
{
prepare();
Expand All @@ -233,31 +247,37 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
{
if (!mvcc_query_info->regions_query_info.empty())
doLocalRead(pipeline, settings.max_block_size);
buildLocalStreams(pipeline, settings.max_block_size);

null_stream_if_empty = std::make_shared<NullBlockInputStream>(storage_for_logical_table->getSampleBlockForColumns(required_columns));
// Should build `remote_requests` and `null_stream` under protect of `table_structure_lock`.
auto null_stream_if_empty = std::make_shared<NullBlockInputStream>(storage_for_logical_table->getSampleBlockForColumns(required_columns));

// Should build these vars under protect of `table_structure_lock`.
buildRemoteRequests();
auto remote_requests = buildRemoteRequests();

releaseAlterLocks();
// A failpoint to test pause before alter lock released
FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired);
// Release alter locks
// The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result
// from those streams even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
const TableLockHolders drop_locks = releaseAlterLocks();

// It is impossible to have no joined stream.
assert(pipeline.streams_with_non_joined_data.empty());
// after executeRemoteQuery, remote read stream will be appended in pipeline.streams.
// after buildRemoteStreams, remote read stream will be appended in pipeline.streams.
size_t remote_read_streams_start_index = pipeline.streams.size();

// For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop / mpp mode.
if (!remote_requests.empty())
executeRemoteQuery(pipeline);
buildRemoteStreams(std::move(remote_requests), pipeline);

/// record local and remote io input stream
auto & table_scan_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[table_scan.getTableScanExecutorID()];
pipeline.transform([&](auto & stream) { table_scan_io_input_streams.push_back(stream); });

if (pipeline.streams.empty())
{
pipeline.streams.emplace_back(null_stream_if_empty);
pipeline.streams.emplace_back(std::move(null_stream_if_empty));
// reset remote_read_streams_start_index for null_stream_if_empty.
remote_read_streams_start_index = 1;
}
Expand All @@ -268,7 +288,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
pipeline.transform([&](auto & stream) {
// todo do not need to hold all locks in each stream, if the stream is reading from table a
// it only needs to hold the lock of table a
for (auto & lock : drop_locks)
for (const auto & lock : drop_locks)
stream->addTableLock(lock);
});

Expand All @@ -290,21 +310,36 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

void DAGStorageInterpreter::prepare()
{
// About why we do learner read before acquiring structure lock on Storage(s).
// Assume that:
// 1. Read threads do learner read and wait for the Raft applied index with holding a read lock
// on "alter lock" of an IStorage X
// 2. Raft threads try to decode data for Region in the same IStorage X, and find it need to
// apply DDL operations which acquire write lock on "alter locks"
// Under this situation, all Raft threads will be stuck by the read threads, but read threads
// wait for Raft threads to push forward the applied index. Deadlocks happens!!
// So we must do learner read without structure lock on IStorage. After learner read, acquire the
// structure lock of IStorage(s) (to avoid concurrent issues between read threads and DDL
// operations) and build the requested inputstreams. Once the inputstreams build, we should release
// the alter lock to avoid blocking DDL operations.
// TODO: If we can acquire a read-only view on the IStorage structure (both `ITableDeclaration`
// and `TiDB::TableInfo`) we may get this process more simplified. (tiflash/issues/1853)

// Do learner read
const DAGContext & dag_context = *context.getDAGContext();
if (dag_context.isBatchCop() || dag_context.isMPPTask())
learner_read_snapshot = doBatchCopLearnerRead();
else
learner_read_snapshot = doCopLearnerRead();

// Acquire read lock on `alter lock` and build the requested inputstreams
storages_with_structure_lock = getAndLockStorages(settings.schema_version);
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(settings.max_columns_to_read);

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);

FAIL_POINT_PAUSE(FailPoints::pause_after_learner_read);
}

void DAGStorageInterpreter::executePushedDownFilter(
Expand Down Expand Up @@ -392,7 +427,7 @@ void DAGStorageInterpreter::executeCastAfterTableScan(
}
}

void DAGStorageInterpreter::executeRemoteQuery(DAGPipeline & pipeline)
void DAGStorageInterpreter::buildRemoteStreams(std::vector<RemoteRequest> && remote_requests, DAGPipeline & pipeline)
{
assert(!remote_requests.empty());
DAGSchema & schema = remote_requests[0].schema;
Expand Down Expand Up @@ -464,8 +499,9 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead()
{
if (table_scan.isPartitionTableScan())
{
throw Exception("Cop request does not support partition table scan");
throw TiFlashException("Cop request does not support partition table scan", DB::Errors::Coprocessor::BadRequest);
}

TablesRegionInfoMap regions_for_local_read;
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
{
Expand All @@ -481,7 +517,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead()
if (info_retry)
throw RegionException({info_retry->begin()->get().region_id}, status);

return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, false, context, log);
return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/false, context, log);
}

/// Will assign region_retry_from_local_region
Expand Down Expand Up @@ -517,7 +553,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead()
}
if (mvcc_query_info->regions_query_info.empty())
return {};
return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, true, context, log);
return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/true, context, log);
}
catch (const LockException & e)
{
Expand Down Expand Up @@ -584,18 +620,18 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
return ret;
}

void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block_size)
void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size)
{
const DAGContext & dag_context = *context.getDAGContext();
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
auto table_query_infos = generateSelectQueryInfos();
for (auto & table_query_info : table_query_infos)
const auto table_query_infos = generateSelectQueryInfos();
for (const auto & table_query_info : table_query_infos)
{
DAGPipeline current_pipeline;
TableID table_id = table_query_info.first;
SelectQueryInfo & query_info = table_query_info.second;
const TableID table_id = table_query_info.first;
const SelectQueryInfo & query_info = table_query_info.second;
size_t region_num = query_info.mvcc_query_info->regions_query_info.size();
if (region_num == 0)
continue;
Expand All @@ -613,11 +649,11 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block
{
current_pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, current_max_streams);

// After getting streams from storage, we need to validate whether regions have changed or not after learner read.
// In case the versions of regions have changed, those `streams` may contain different data other than expected.
// Like after region merge/split.
// After getting streams from storage, we need to validate whether Regions have changed or not after learner read.
// (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams`
// may contain different data other than expected.

// Inject failpoint to throw RegionException
// Inject failpoint to throw RegionException for testing
fiu_do_on(FailPoints::region_exception_after_read_from_storage_some_error, {
const auto & regions_info = query_info.mvcc_query_info->regions_query_info;
RegionException::UnavailableRegions region_ids;
Expand Down Expand Up @@ -781,7 +817,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return {{}, {}, {}, false};
}

if (table_store->engineType() != ::TiDB::StorageEngine::TMT && table_store->engineType() != ::TiDB::StorageEngine::DT)
if (unlikely(table_store->engineType() != ::TiDB::StorageEngine::DT))
{
throw TiFlashException(
fmt::format(
Expand Down Expand Up @@ -954,8 +990,10 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
return {required_columns_tmp, source_columns_tmp, need_cast_column};
}

void DAGStorageInterpreter::buildRemoteRequests()
// Build remote requests from `region_retry_from_local_region` and `table_regions_info.remote_regions`
std::vector<RemoteRequest> DAGStorageInterpreter::buildRemoteRequests()
{
std::vector<RemoteRequest> remote_requests;
std::unordered_map<Int64, Int64> region_id_to_table_id_map;
std::unordered_map<Int64, RegionRetryList> retry_regions_map;
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
Expand All @@ -978,6 +1016,8 @@ void DAGStorageInterpreter::buildRemoteRequests()
if (retry_regions.empty())
continue;

// Append the region into DAGContext to return them to the upper layer.
// The upper layer should refresh its cache about these regions.
for (const auto & r : retry_regions)
context.getDAGContext()->retry_regions.push_back(r.get());

Expand All @@ -989,17 +1029,17 @@ void DAGStorageInterpreter::buildRemoteRequests()
push_down_filter,
log));
}
return remote_requests;
}

void DAGStorageInterpreter::releaseAlterLocks()
TableLockHolders DAGStorageInterpreter::releaseAlterLocks()
{
// The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result
// from those streams even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
TableLockHolders drop_locks;
for (auto storage_with_lock : storages_with_structure_lock)
{
drop_locks.emplace_back(std::get<1>(std::move(storage_with_lock.second.lock).release()));
}
return drop_locks;
}

} // namespace DB
23 changes: 6 additions & 17 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@

#pragma once

#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/Context.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableLockHolder.h>
Expand All @@ -30,20 +28,14 @@
#include <Storages/Transaction/Types.h>
#include <pingcap/coprocessor/Client.h>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <kvproto/coprocessor.pb.h>
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

#include <vector>

namespace DB
{
using TablesRegionInfoMap = std::unordered_map<Int64, std::reference_wrapper<const RegionInfoMap>>;
/// DAGStorageInterpreter encapsulates operations around storage during interprete stage.
/// It's only intended to be used by DAGQueryBlockInterpreter.
/// After DAGStorageInterpreter::execute some of its members will be transfered to DAGQueryBlockInterpreter.
/// After DAGStorageInterpreter::execute some of its members will be transferred to DAGQueryBlockInterpreter.
class DAGStorageInterpreter
{
public:
Expand All @@ -58,7 +50,7 @@ class DAGStorageInterpreter

void execute(DAGPipeline & pipeline);

/// Members will be transfered to DAGQueryBlockInterpreter after execute
/// Members will be transferred to DAGQueryBlockInterpreter after execute

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

Expand All @@ -72,23 +64,23 @@ class DAGStorageInterpreter

LearnerReadSnapshot doBatchCopLearnerRead();

void doLocalRead(DAGPipeline & pipeline, size_t max_block_size);
void buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size);

std::unordered_map<TableID, StorageWithStructureLock> getAndLockStorages(Int64 query_schema_version);

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan(Int64 max_columns_to_read);

void buildRemoteRequests();
std::vector<RemoteRequest> buildRemoteRequests();

void releaseAlterLocks();
TableLockHolders releaseAlterLocks();

std::unordered_map<TableID, SelectQueryInfo> generateSelectQueryInfos();

DAGContext & dagContext() const;

void recordProfileStreams(DAGPipeline & pipeline, const String & key);

void executeRemoteQuery(DAGPipeline & pipeline);
void buildRemoteStreams(std::vector<RemoteRequest> && remote_requests, DAGPipeline & pipeline);

void executeCastAfterTableScan(
size_t remote_read_streams_start_index,
Expand All @@ -106,9 +98,6 @@ class DAGStorageInterpreter
std::vector<ExtraCastAfterTSMode> is_need_add_cast_column;
/// it shouldn't be hash map because duplicated region id may occur if merge regions to retry of dag.
RegionRetryList region_retry_from_local_region;
TableLockHolders drop_locks;
std::vector<RemoteRequest> remote_requests;
BlockInputStreamPtr null_stream_if_empty;

/// passed from caller, doesn't change during DAGStorageInterpreter's lifetime

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TiDBTableScan
{
return physical_table_ids;
}
String getTableScanExecutorID() const
const String & getTableScanExecutorID() const
{
return executor_id;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Functions/GeoUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#endif

#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic ignored "-Wunused-variable"
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/LearnerRead.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct RegionLearnerReadSnapshot : RegionPtr
UInt64 snapshot_event_flag{0};

RegionLearnerReadSnapshot() = default;
RegionLearnerReadSnapshot(const RegionPtr & region)
explicit RegionLearnerReadSnapshot(const RegionPtr & region)
: RegionPtr(region)
, snapshot_event_flag(region->getSnapshotEventFlag())
{}
Expand Down

0 comments on commit 5d461ab

Please sign in to comment.