Skip to content

Commit

Permalink
Merge branch 'master' into fix_tablescan_trace
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Apr 18, 2022
2 parents a8d8318 + cf8ab95 commit aa419d0
Show file tree
Hide file tree
Showing 20 changed files with 812 additions and 87 deletions.
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ target_link_libraries (dbms
${RE2_ST_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
absl::synchronization
)

if (NOT USE_INTERNAL_RE2_LIBRARY)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/CreatingSetsBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Common/MemoryTracker.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/ExpressionAnalyzer.h> /// SubqueriesForSets
#include <Interpreters/SubqueryForSet.h>


namespace DB
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Encryption/RateLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ void IORateLimiter::setBackgroundThreadIds(std::vector<pid_t> thread_ids)
{
std::lock_guard lock(bg_thread_ids_mtx);
bg_thread_ids.swap(thread_ids);
LOG_FMT_INFO(log, "bg_thread_ids {} => {}", bg_thread_ids.size(), bg_thread_ids);
}

std::pair<Int64, Int64> IORateLimiter::getReadWriteBytes(const std::string & fname [[maybe_unused]])
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ bool DAGContext::allowInvalidDate() const
return sql_mode & TiDBSQLMode::ALLOW_INVALID_DATES;
}

void DAGContext::addSubquery(const String & subquery_id, SubqueryForSet && subquery)
{
SubqueriesForSets subqueries_for_sets;
subqueries_for_sets[subquery_id] = std::move(subquery);
subqueries.push_back(std::move(subqueries_for_sets));
}

std::unordered_map<String, BlockInputStreams> & DAGContext::getProfileStreamsMap()
{
return profile_streams_map;
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/SubqueryForSet.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
Expand Down Expand Up @@ -279,6 +280,10 @@ class DAGContext
void initExchangeReceiverIfMPP(Context & context, size_t max_streams);
const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & getMPPExchangeReceiverMap() const;

void addSubquery(const String & subquery_id, SubqueryForSet && subquery);
bool hasSubquery() const { return !subqueries.empty(); }
std::vector<SubqueriesForSets> && moveSubqueries() { return std::move(subqueries); }

const tipb::DAGRequest * dag_request;
Int64 compile_time_ns = 0;
size_t final_concurrency = 1;
Expand Down Expand Up @@ -337,6 +342,9 @@ class DAGContext
/// key: executor_id of ExchangeReceiver nodes in dag.
std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> mpp_exchange_receiver_map;
bool mpp_exchange_receiver_map_inited = false;
/// vector of SubqueriesForSets(such as join build subquery).
/// The order of the vector is also the order of the subquery.
std::vector<SubqueriesForSets> subqueries;
};

} // namespace DB
9 changes: 2 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
Context & context_,
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
std::vector<SubqueriesForSets> & subqueries_for_sets_)
size_t max_streams_)
: context(context_)
, input_streams_vec(input_streams_vec_)
, query_block(query_block_)
, max_streams(max_streams_)
, subqueries_for_sets(subqueries_for_sets_)
, log(Logger::get("DAGQueryBlockInterpreter", dagContext().log ? dagContext().log->identifier() : ""))
{}

Expand Down Expand Up @@ -1023,10 +1021,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
SubqueryForSet right_query;
handleJoin(query_block.source->join(), pipeline, right_query);
recordProfileStreams(pipeline, query_block.source_name);

SubqueriesForSets subquries;
subquries[query_block.source_name] = right_query;
subqueries_for_sets.emplace_back(subquries);
dagContext().addSubquery(query_block.source_name, std::move(right_query));
}
else if (query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver)
{
Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class DAGQueryBlockInterpreter
Context & context_,
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
std::vector<SubqueriesForSets> & subqueries_for_sets_);
size_t max_streams_);

~DAGQueryBlockInterpreter() = default;

Expand Down Expand Up @@ -117,8 +116,6 @@ class DAGQueryBlockInterpreter

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

std::vector<SubqueriesForSets> & subqueries_for_sets;

LoggerPtr log;
};
} // namespace DB
30 changes: 16 additions & 14 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Context.h>

namespace DB
{
Expand All @@ -35,23 +36,27 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
}
}

DAGContext & InterpreterDAG::dagContext() const
{
return *context.getDAGContext();
}

/** executeQueryBlock recursively converts all the children of the DAGQueryBlock and itself (Coprocessor DAG request)
* into an array of IBlockInputStream (element of physical executing plan of TiFlash)
*/
BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block, std::vector<SubqueriesForSets> & subqueries_for_sets)
BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block)
{
std::vector<BlockInputStreams> input_streams_vec;
for (auto & child : query_block.children)
{
BlockInputStreams child_streams = executeQueryBlock(*child, subqueries_for_sets);
BlockInputStreams child_streams = executeQueryBlock(*child);
input_streams_vec.push_back(child_streams);
}
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams,
subqueries_for_sets);
max_streams);
return query_block_interpreter.execute();
}

Expand All @@ -60,26 +65,23 @@ BlockIO InterpreterDAG::execute()
/// Due to learner read, DAGQueryBlockInterpreter may take a long time to build
/// the query plan, so we init mpp exchange receiver before executeQueryBlock
dagContext().initExchangeReceiverIfMPP(context, max_streams);
/// region_info should base on the source executor, however
/// tidb does not support multi-table dag request yet, so
/// it is ok to use the same region_info for the whole dag request
std::vector<SubqueriesForSets> subqueries_for_sets;
BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock(), subqueries_for_sets);

BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock());
DAGPipeline pipeline;
pipeline.streams = streams;

/// add union to run in parallel if needed
if (context.getDAGContext()->isMPPTask())
if (dagContext().isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true);
else
executeUnion(pipeline, max_streams, dagContext().log);
if (!subqueries_for_sets.empty())
if (dagContext().hasSubquery())
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(subqueries_for_sets),
std::move(dagContext().moveSubqueries()),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
dagContext().log->identifier());
}
Expand Down
13 changes: 3 additions & 10 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,9 @@
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IInterpreter.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/Transaction/Collator.h>
#include <Storages/Transaction/TMTStorages.h>

namespace DB
{
Expand All @@ -50,9 +43,9 @@ class InterpreterDAG : public IInterpreter
BlockIO execute() override;

private:
BlockInputStreams executeQueryBlock(DAGQueryBlock & query_block, std::vector<SubqueriesForSets> & subqueries_for_sets);
BlockInputStreams executeQueryBlock(DAGQueryBlock & query_block);

DAGContext & dagContext() const { return *context.getDAGContext(); }
DAGContext & dagContext() const;

Context & context;
const DAGQuerySource & dag;
Expand Down
22 changes: 22 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,28 @@ MPPTunnelBase<Writer>::MPPTunnelBase(
GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment();
}

template <typename Writer>
MPPTunnelBase<Writer>::MPPTunnelBase(
const String & tunnel_id_,
const std::chrono::seconds timeout_,
int input_steams_num_,
bool is_local_,
bool is_async_,
const String & req_id)
: connected(false)
, finished(false)
, is_local(is_local_)
, is_async(is_async_)
, timeout(timeout_)
, tunnel_id(tunnel_id_)
, input_streams_num(input_steams_num_)
, send_queue(std::max(5, input_steams_num_ * 5)) // MPMCQueue can benefit from a slightly larger queue size
, thread_manager(newThreadManager())
, log(Logger::get("MPPTunnel", req_id, tunnel_id))
{
RUNTIME_ASSERT(!(is_local && is_async), log, "is_local: {}, is_async: {}.", is_local, is_async);
}

template <typename Writer>
MPPTunnelBase<Writer>::~MPPTunnelBase()
{
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@

namespace DB
{
namespace tests
{
class MPPTunnelTest;
class TestMPPTunnelBase;
} // namespace tests

class EstablishCallData;

/**
Expand Down Expand Up @@ -123,6 +129,17 @@ class MPPTunnelBase : private boost::noncopyable
void sendJob(bool need_lock = true);

private:
friend class tests::MPPTunnelTest;
friend class tests::TestMPPTunnelBase;
// For gtest usage
MPPTunnelBase(
const String & tunnel_id_,
std::chrono::seconds timeout_,
int input_steams_num_,
bool is_local_,
bool is_async_,
const String & req_id);

void finishSendQueue();

void waitUntilConnectedOrFinished(std::unique_lock<std::mutex> & lk);
Expand Down
Loading

0 comments on commit aa419d0

Please sign in to comment.