diff --git a/contrib/client-c b/contrib/client-c index 05553c96920..34f9ca27f78 160000 --- a/contrib/client-c +++ b/contrib/client-c @@ -1 +1 @@ -Subproject commit 05553c969206f016b87acad5164c3535346f07c8 +Subproject commit 34f9ca27f78fa45bbe236352a4bdb1ca44b34194 diff --git a/contrib/tipb b/contrib/tipb index f31a15d98fc..4fad48b4c8c 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit f31a15d98fceea315accb67a53b2416d43364e33 +Subproject commit 4fad48b4c8c3e17805a520d6c3a7655077ffca46 diff --git a/dbms/src/DataStreams/CoprocessorBlockInputStream.h b/dbms/src/DataStreams/CoprocessorBlockInputStream.h index 5871165212d..3a90bdc41df 100644 --- a/dbms/src/DataStreams/CoprocessorBlockInputStream.h +++ b/dbms/src/DataStreams/CoprocessorBlockInputStream.h @@ -29,10 +29,9 @@ class CoprocessorBlockInputStream : public IProfilingBlockInputStream } public: - CoprocessorBlockInputStream(pingcap::kv::Cluster * cluster_, const pingcap::coprocessor::Request & req_, const DAGSchema & schema_, - int concurrency, pingcap::kv::StoreType store_type) - : req(req_), - resp_iter(pingcap::coprocessor::Client::send(cluster_, &req, concurrency, store_type)), + CoprocessorBlockInputStream( + pingcap::kv::Cluster * cluster_, std::vector tasks, const DAGSchema & schema_, int concurrency) + : resp_iter(std::move(tasks), cluster_, concurrency, &Logger::get("pingcap/coprocessor")), schema(schema_), sample_block(getSampleBlock()), log(&Logger::get("pingcap/coprocessor")) @@ -52,8 +51,7 @@ class CoprocessorBlockInputStream : public IProfilingBlockInputStream if (!has_next) return {}; } - - auto chunk = std::move(chunk_queue.front()); + auto chunk = chunk_queue.front(); chunk_queue.pop(); switch (resp->encode_type()) { @@ -101,7 +99,6 @@ class CoprocessorBlockInputStream : public IProfilingBlockInputStream return true; } - pingcap::coprocessor::Request req; pingcap::coprocessor::ResponseIter resp_iter; DAGSchema schema; diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp index 61a8b0be10a..e51d1627d40 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp @@ -1,9 +1,10 @@ -#include -#include -#include -#include #include +#include +#include +#include +#include #include + #include @@ -12,25 +13,38 @@ namespace DB namespace ErrorCodes { - extern const int SET_SIZE_LIMIT_EXCEEDED; +extern const int SET_SIZE_LIMIT_EXCEEDED; } +CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(const BlockInputStreamPtr & input, + std::vector && subqueries_for_sets_list_, + const SizeLimits & network_transfer_limits) + : subqueries_for_sets_list(std::move(subqueries_for_sets_list_)), network_transfer_limits(network_transfer_limits) +{ + init(input); +} CreatingSetsBlockInputStream::CreatingSetsBlockInputStream( - const BlockInputStreamPtr & input, - const SubqueriesForSets & subqueries_for_sets_, - const SizeLimits & network_transfer_limits) - : subqueries_for_sets(subqueries_for_sets_), - network_transfer_limits(network_transfer_limits) + const BlockInputStreamPtr & input, const SubqueriesForSets & subqueries_for_sets, const SizeLimits & network_transfer_limits) + : network_transfer_limits(network_transfer_limits) { - for (auto & elem : subqueries_for_sets) + subqueries_for_sets_list.push_back(subqueries_for_sets); + init(input); +} + +void CreatingSetsBlockInputStream::init(const BlockInputStreamPtr & input) +{ + for (auto & subqueries_for_sets : subqueries_for_sets_list) { - if (elem.second.source) + for (auto & elem : subqueries_for_sets) { - children.push_back(elem.second.source); + if (elem.second.source) + { + children.push_back(elem.second.source); - if (elem.second.set) - elem.second.set->setHeader(elem.second.source->getHeader()); + if (elem.second.set) + elem.second.set->setHeader(elem.second.source->getHeader()); + } } } @@ -51,10 +65,7 @@ Block CreatingSetsBlockInputStream::readImpl() } -void CreatingSetsBlockInputStream::readPrefixImpl() -{ - createAll(); -} +void CreatingSetsBlockInputStream::readPrefixImpl() { createAll(); } Block CreatingSetsBlockInputStream::getTotals() @@ -72,14 +83,17 @@ void CreatingSetsBlockInputStream::createAll() { if (!created) { - for (auto & elem : subqueries_for_sets) + for (auto & subqueries_for_sets : subqueries_for_sets_list) { - if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them. + for (auto & elem : subqueries_for_sets) { - if (isCancelledOrThrowIfKilled()) - return; + if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them. + { + if (isCancelledOrThrowIfKilled()) + return; - createOne(elem.second); + createOne(elem.second); + } } } @@ -87,12 +101,11 @@ void CreatingSetsBlockInputStream::createAll() } } - void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) { - LOG_TRACE(log, (subquery.set ? "Creating set. " : "") - << (subquery.join ? "Creating join. " : "") - << (subquery.table ? "Filling temporary table. " : "")); + LOG_TRACE(log, + (subquery.set ? "Creating set. " : "") << (subquery.join ? "Creating join. " : "") + << (subquery.table ? "Filling temporary table. " : "")); Stopwatch watch; BlockOutputStreamPtr table_out; @@ -137,7 +150,8 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) rows_to_transfer += block.rows(); bytes_to_transfer += block.bytes(); - if (!network_transfer_limits.check(rows_to_transfer, bytes_to_transfer, "IN/JOIN external table", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED)) + if (!network_transfer_limits.check( + rows_to_transfer, bytes_to_transfer, "IN/JOIN external table", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED)) done_with_table = true; } @@ -188,4 +202,4 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) } } -} +} // namespace DB diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.h b/dbms/src/DataStreams/CreatingSetsBlockInputStream.h index ff8fe5683c7..79b3b730ac4 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.h +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.h @@ -1,11 +1,14 @@ #pragma once -#include #include -#include /// SubqueriesForSets +#include /// SubqueriesForSets +#include -namespace Poco { class Logger; } +namespace Poco +{ +class Logger; +} namespace DB { @@ -18,8 +21,10 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream { public: CreatingSetsBlockInputStream( - const BlockInputStreamPtr & input, - const SubqueriesForSets & subqueries_for_sets_, + const BlockInputStreamPtr & input, const SubqueriesForSets & subqueries_for_sets_, const SizeLimits & network_transfer_limits); + + CreatingSetsBlockInputStream(const BlockInputStreamPtr & input, + std::vector && subqueries_for_sets_list_, const SizeLimits & network_transfer_limits); String getName() const override { return "CreatingSets"; } @@ -34,7 +39,9 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream void readPrefixImpl() override; private: - SubqueriesForSets subqueries_for_sets; + void init(const BlockInputStreamPtr & input); + + std::vector subqueries_for_sets_list; bool created = false; SizeLimits network_transfer_limits; @@ -49,4 +56,4 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream void createOne(SubqueryForSet & subquery); }; -} +} // namespace DB diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index 04907c9a710..d8512b04966 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -103,6 +103,7 @@ Block IProfilingBlockInputStream::read(FilterPtr & res_filter, bool return_filte void IProfilingBlockInputStream::readPrefix() { + auto start_time = info.total_stopwatch.elapsed(); readPrefixImpl(); forEachChild([&] (IBlockInputStream & child) @@ -110,11 +111,13 @@ void IProfilingBlockInputStream::readPrefix() child.readPrefix(); return false; }); + info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time); } void IProfilingBlockInputStream::readSuffix() { + auto start_time = info.total_stopwatch.elapsed(); forEachChild([&] (IBlockInputStream & child) { child.readSuffix(); @@ -122,6 +125,7 @@ void IProfilingBlockInputStream::readSuffix() }); readSuffixImpl(); + info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time); } diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index 787ba974546..ed0ac6b3a22 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -22,6 +22,10 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } +NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_, std::vector && output_names_) + : istr(istr_), server_revision(server_revision_), output_names(std::move(output_names_)) +{ +} NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_) : istr(istr_), server_revision(server_revision_) @@ -109,6 +113,11 @@ Block NativeBlockInputStream::readImpl() rows = index_block_it->num_rows; } + if (output_names.size() > 0 && output_names.size() != columns) + throw Exception("NativeBlockInputStream with explicity output name, but the block column size " + "is not equal to the size of output names", ErrorCodes::LOGICAL_ERROR); + bool explicit_output_name = output_names.size() > 0; + for (size_t i = 0; i < columns; ++i) { if (use_index) @@ -121,6 +130,8 @@ Block NativeBlockInputStream::readImpl() /// Name readBinary(column.name, istr); + if (explicit_output_name) + column.name = output_names[i]; /// Type String type_name; diff --git a/dbms/src/DataStreams/NativeBlockInputStream.h b/dbms/src/DataStreams/NativeBlockInputStream.h index 14ce18347e3..cc1ea5cb9e2 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.h +++ b/dbms/src/DataStreams/NativeBlockInputStream.h @@ -60,6 +60,8 @@ struct IndexForNativeFormat class NativeBlockInputStream : public IProfilingBlockInputStream { public: + /// provide output column names explicitly + NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_, std::vector && output_names_); /// If a non-zero server_revision is specified, additional block information may be expected and read. NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_); @@ -96,6 +98,8 @@ class NativeBlockInputStream : public IProfilingBlockInputStream PODArray avg_value_size_hints; + std::vector output_names; + void updateAvgValueSizeHints(const Block & block); }; diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp index 8f6e5d713d5..e8832666250 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -69,11 +69,14 @@ std::unique_ptr CHBlockChunkCodec::newCodecStream(const std::v return std::make_unique(field_types); } -Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema &) +Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & schema) { const String & row_data = chunk.rows_data(); ReadBufferFromString read_buffer(row_data); - NativeBlockInputStream block_in(read_buffer, 0); + std::vector output_names; + for (const auto & c : schema) + output_names.push_back(c.first); + NativeBlockInputStream block_in(read_buffer, 0, std::move(output_names)); return block_in.read(); } diff --git a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp index d78a295f013..4952dcc5b73 100644 --- a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp +++ b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp @@ -1,195 +1,27 @@ -#include -#include -#include #include -#include namespace DB { -namespace ErrorCodes -{ -extern const int UNSUPPORTED_PARAMETER; -extern const int LOGICAL_ERROR; -} // namespace ErrorCodes - -template <> -DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse * dag_response_, Int64 records_per_chunk_, - tipb::EncodeType encode_type_, std::vector result_field_types_, Block && header_, DAGContext & dag_context_, - bool collect_execute_summary_) - : dag_response(dag_response_), - result_field_types(std::move(result_field_types_)), - header(std::move(header_)), - records_per_chunk(records_per_chunk_), - encode_type(encode_type_), - current_records_num(0), - dag_context(dag_context_), - collect_execute_summary(collect_execute_summary_) -{ - previous_execute_stats.resize(dag_context.profile_streams_list.size(), std::make_tuple(0, 0, 0)); - if (encode_type == tipb::EncodeType::TypeDefault) - { - chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); - } - else if (encode_type == tipb::EncodeType::TypeChunk) - { - chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); - } - else if (encode_type == tipb::EncodeType::TypeCHBlock) - { - chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); - records_per_chunk = -1; - } - else - { - throw Exception("Only Default and Arrow encode type is supported in DAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER); - } - dag_response->set_encode_type(encode_type); -} - -template <> -DAGBlockOutputStream::DAGBlockOutputStream(::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_, Int64 records_per_chunk_, - tipb::EncodeType encode_type_, std::vector result_field_types_, Block && header_, DAGContext & dag_context_, - bool collect_execute_summary_) - : writer(writer_), - result_field_types(std::move(result_field_types_)), - header(std::move(header_)), - records_per_chunk(records_per_chunk_), - encode_type(encode_type_), - current_records_num(0), - dag_context(dag_context_), - collect_execute_summary(collect_execute_summary_) -{ - previous_execute_stats.resize(dag_context.profile_streams_list.size(), std::make_tuple(0, 0, 0)); - if (encode_type == tipb::EncodeType::TypeDefault) - { - chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); - } - else if (encode_type == tipb::EncodeType::TypeChunk) - { - chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); - } - else if (encode_type == tipb::EncodeType::TypeCHBlock) - { - chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); - records_per_chunk = -1; - } - else - { - throw Exception( - "Only Default and Arrow encode type is supported in StreamingDAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER); - } -} +DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse * dag_response, Int64 records_per_chunk, tipb::EncodeType encode_type, + std::vector result_field_types, Block && header_, DAGContext & dag_context, bool collect_execute_summary, + bool return_executor_id) + : header(std::move(header_)), + response_writer(dag_response, nullptr, records_per_chunk, encode_type, result_field_types, dag_context, collect_execute_summary, + return_executor_id) +{} -template -void DAGBlockOutputStream::writePrefix() +void DAGBlockOutputStream::writePrefix() { //something to do here? } -template <> -void DAGBlockOutputStream::encodeChunkToDAGResponse() -{ - auto dag_chunk = dag_response->add_chunks(); - dag_chunk->set_rows_data(chunk_codec_stream->getString()); - chunk_codec_stream->clear(); - current_records_num = 0; -} - -template -void DAGBlockOutputStream::addExecuteSummaries(tipb::SelectResponse * response) -{ - if (!collect_execute_summary) - return; - // add ExecutorExecutionSummary info - for (size_t i = 0; i < dag_context.profile_streams_list.size(); i++) - { - auto & p_streams = dag_context.profile_streams_list[i]; - auto * executeSummary = response->add_execution_summaries(); - UInt64 time_processed_ns = 0; - UInt64 num_produced_rows = 0; - UInt64 num_iterations = 0; - for (auto & streamPtr : p_streams) - { - if (auto * p_stream = dynamic_cast(streamPtr.get())) - { - time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().execution_time); - num_produced_rows += p_stream->getProfileInfo().rows; - num_iterations += p_stream->getProfileInfo().blocks; - } - } - auto & pre_stats = previous_execute_stats[i]; - executeSummary->set_time_processed_ns(time_processed_ns - std::get<0>(pre_stats)); - executeSummary->set_num_produced_rows(num_produced_rows - std::get<1>(pre_stats)); - executeSummary->set_num_iterations(num_iterations - std::get<2>(pre_stats)); - std::get<0>(pre_stats) = time_processed_ns; - std::get<1>(pre_stats) = num_produced_rows; - std::get<2>(pre_stats) = num_iterations; - } -} - -template <> -void DAGBlockOutputStream::encodeChunkToDAGResponse() -{ - ::coprocessor::BatchResponse resp; - - tipb::SelectResponse stream_dag_response; - stream_dag_response.set_encode_type(encode_type); - auto dag_chunk = stream_dag_response.add_chunks(); - dag_chunk->set_rows_data(chunk_codec_stream->getString()); - chunk_codec_stream->clear(); - current_records_num = 0; - addExecuteSummaries(&stream_dag_response); - std::string dag_data; - stream_dag_response.SerializeToString(&dag_data); - resp.set_data(dag_data); +void DAGBlockOutputStream::write(const Block & block) { response_writer.write(block); } - writer->Write(resp); -} - -template -void DAGBlockOutputStream::writeSuffix() +void DAGBlockOutputStream::writeSuffix() { // todo error handle - if (current_records_num > 0) - { - encodeChunkToDAGResponse(); - } - if constexpr (!streaming) - { - addExecuteSummaries(dag_response); - } -} - -template -void DAGBlockOutputStream::write(const Block & block) -{ - if (block.columns() != result_field_types.size()) - throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR); - if (records_per_chunk == -1) - { - current_records_num = 0; - if (block.rows() > 0) - { - chunk_codec_stream->encode(block, 0, block.rows()); - encodeChunkToDAGResponse(); - } - } - else - { - size_t rows = block.rows(); - for (size_t row_index = 0; row_index < rows;) - { - if (current_records_num >= records_per_chunk) - { - encodeChunkToDAGResponse(); - } - const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows); - chunk_codec_stream->encode(block, row_index, upper); - current_records_num += (upper - row_index); - row_index = upper; - } - } + response_writer.finishWrite(); } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h index bb249a1111d..43d1bddbc38 100644 --- a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h +++ b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h @@ -8,45 +8,30 @@ #include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" +#include #include + #pragma GCC diagnostic pop namespace DB { /// Serializes the stream of blocks in TiDB DAG response format. -/// TODO: May consider using some parallelism. -/// TODO: Consider using output schema in DAG request, do some conversion or checking between DAG schema and block schema. -template class DAGBlockOutputStream : public IBlockOutputStream { public: DAGBlockOutputStream(tipb::SelectResponse * response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_, - std::vector result_field_types, Block && header_, DAGContext & dag_context_, bool collect_execute_summary_); - - DAGBlockOutputStream(::grpc::ServerWriter<::coprocessor::BatchResponse> * writer, Int64 records_per_chunk_, - tipb::EncodeType encodeType_, std::vector result_field_types, Block && header_, DAGContext & dag_context_, - bool collect_execute_summary_); + std::vector result_field_types, Block && header_, DAGContext & dag_context_, bool collect_execute_summary_, + bool return_executor_id); - Block getHeader() const override { return header; } - void write(const Block & block) override; - void writePrefix() override; - void writeSuffix() override; - void encodeChunkToDAGResponse(); - void addExecuteSummaries(tipb::SelectResponse * dag_response); + Block getHeader() const { return header; } + void write(const Block & block); + void writePrefix(); + void writeSuffix(); private: - tipb::SelectResponse * dag_response; - ::grpc::ServerWriter<::coprocessor::BatchResponse> * writer; - std::vector result_field_types; Block header; - Int64 records_per_chunk; - tipb::EncodeType encode_type; - std::unique_ptr chunk_codec_stream; - Int64 current_records_num; - DAGContext & dag_context; - bool collect_execute_summary; - std::vector> previous_execute_stats; + DAGResponseWriter response_writer; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 385fc48cd68..e9ee4e4823b 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -8,11 +8,18 @@ namespace DB class Context; +struct ProfileStreamsInfo +{ + UInt32 qb_id; + BlockInputStreams input_streams; +}; /// A context used to track the information that needs to be passed around during DAG planning. struct DAGContext { - DAGContext(size_t profile_list_size) { profile_streams_list.resize(profile_list_size); }; - std::vector profile_streams_list; + DAGContext(){}; + std::map profile_streams_map; + std::unordered_map profile_streams_map_for_join_build_side; + std::unordered_map> qb_id_to_join_alias_map; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index 74e87c0d389..5625fe97db3 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -1,13 +1,11 @@ #include #include -#include #include #include #include #include #include #include -#include #include #include #include @@ -15,7 +13,6 @@ #include #include #include -#include #include namespace DB @@ -34,6 +31,7 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_req dag_request(dag_request_), regions(regions_), dag_response(dag_response_), + writer(nullptr), internal(internal_), log(&Logger::get("DAGDriver")) { @@ -61,28 +59,29 @@ template void DAGDriver::execute() try { - DAGContext dag_context(dag_request.executors_size()); - DAGQuerySource dag(context, dag_context, regions, dag_request, batch); + DAGContext dag_context; + DAGQuerySource dag(context, dag_context, regions, dag_request, writer, batch); BlockIO streams = executeQuery(dag, context, internal, QueryProcessingStage::Complete); if (!streams.in || streams.out) // Only query is allowed, so streams.in must not be null and streams.out must be null throw Exception("DAG is not query.", ErrorCodes::LOGICAL_ERROR); - BlockOutputStreamPtr dag_output_stream; - if constexpr (batch) + if constexpr (!batch) { - dag_output_stream = std::make_shared>(writer, context.getSettings().dag_records_per_chunk, - dag.getEncodeType(), dag.getResultFieldTypes(), streams.in->getHeader(), dag_context, - dag_request.has_collect_execution_summaries() && dag_request.collect_execution_summaries()); + bool collect_exec_summary = dag_request.has_collect_execution_summaries() && dag_request.collect_execution_summaries(); + BlockOutputStreamPtr dag_output_stream + = std::make_shared(dag_response, context.getSettings().dag_records_per_chunk, dag.getEncodeType(), + dag.getResultFieldTypes(), streams.in->getHeader(), dag_context, collect_exec_summary, dag_request.has_root_executor()); + copyData(*streams.in, *dag_output_stream); } else { - dag_output_stream = std::make_shared>(dag_response, context.getSettings().dag_records_per_chunk, - dag.getEncodeType(), dag.getResultFieldTypes(), streams.in->getHeader(), dag_context, - dag_request.has_collect_execution_summaries() && dag_request.collect_execution_summaries()); + streams.in->readPrefix(); + while (streams.in->read()) + ; + streams.in->readSuffix(); } - copyData(*streams.in, *dag_output_stream); if (auto * p_stream = dynamic_cast(streams.in.get())) { diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index 8934fae79c7..d06d2fc328a 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -412,7 +412,8 @@ String DAGExpressionAnalyzer::appendTimeZoneCast( // useless casts to all the timestamp columns, in order to avoid redundant cast, when cast the ts // column to the columns with session-level timezone info, the original ts columns with UTC timezone // are still kept, and the InterpreterDAG will choose the correct column based on encode type -bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(ExpressionActionsChain & chain, std::vector is_ts_column) +bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS( + ExpressionActionsChain & chain, std::vector is_ts_column, bool keep_UTC_column) { if (context.getTimezoneInfo().is_utc_timezone) return false; @@ -431,7 +432,8 @@ bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(ExpressionActionsChain & if (tz_col.length() == 0) tz_col = getActions(tz_expr, actions); String casted_name = appendTimeZoneCast(tz_col, source_columns[i].name, func_name, actions); - source_columns.emplace_back(source_columns[i].name, source_columns[i].type); + if (keep_UTC_column) + source_columns.emplace_back(source_columns[i].name, source_columns[i].type); source_columns[i].name = casted_name; ret = true; } @@ -439,6 +441,80 @@ bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(ExpressionActionsChain & return ret; } +void DAGExpressionAnalyzer::appendJoin( + ExpressionActionsChain & chain, SubqueryForSet & join_query, const NamesAndTypesList & columns_added_by_join) +{ + initChain(chain, getCurrentInputColumns()); + ExpressionActionsPtr actions = chain.getLastActions(); + actions->add(ExpressionAction::ordinaryJoin(join_query.join, columns_added_by_join)); +} +/// return true if some actions is needed +bool DAGExpressionAnalyzer::appendJoinKey( + ExpressionActionsChain & chain, const tipb::Join & join, const DataTypes & key_types, Names & key_names, bool tiflash_left) +{ + bool ret = false; + initChain(chain, getCurrentInputColumns()); + ExpressionActionsPtr actions = chain.getLastActions(); + const auto & keys = ((tiflash_left && join.inner_idx() == 1) || (!tiflash_left && join.inner_idx() == 0)) ? join.left_join_keys() + : join.right_join_keys(); + + for (int i = 0; i < keys.size(); i++) + { + const auto & key = keys.at(i); + bool has_actions = key.tp() != tipb::ExprType::ColumnRef; + + String key_name = getActions(key, actions); + DataTypePtr current_type = actions->getSampleBlock().getByName(key_name).type; + if (!removeNullable(current_type)->equals(*removeNullable(key_types[i]))) + { + /// need to convert to key type + key_name = appendCast(key_types[i], actions, key_name); + has_actions = true; + } + if (!tiflash_left && !has_actions) + { + // for right side table, add a new column + String updated_key_name = "_r_k_" + key_name; + actions->add(ExpressionAction::copyColumn(key_name, updated_key_name)); + key_name = updated_key_name; + has_actions = true; + } + key_names.push_back(key_name); + ret |= has_actions; + } + /// remove useless columns to avoid duplicate columns + /// as when compiling the key expression, the origin + /// streams may be added some columns that have the + /// same name on left streams and right streams, for + /// example, if the join condition is something like: + /// id + 1 = id + 1, + /// the left streams and the right streams will have the + /// same constant column for `1` + /// Note that the origin left streams and right streams + /// will never have duplicated columns because in + /// DAGQueryBlockInterpreter we add qb_column_prefix in + /// final project step, so if the join condition is not + /// literal expression, the key names should never be + /// duplicated. In the above example, the final key names should be + /// something like `add(__qb_2_id, 1)` and `add(__qb_3_id, 1)` + if (actions->getSampleBlock().getNames().size() != getCurrentInputColumns().size() + key_names.size()) + { + std::unordered_set needed_columns; + for (const auto & c : getCurrentInputColumns()) + needed_columns.insert(c.name); + for (const auto & s : key_names) + needed_columns.insert(s); + + const auto & names = actions->getSampleBlock().getNames(); + for (const auto & name : names) + { + if (needed_columns.find(name) == needed_columns.end()) + actions->add(ExpressionAction::removeColumn(name)); + } + } + return ret; +} + void DAGExpressionAnalyzer::appendAggSelect( ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, bool keep_session_timezone_info) { @@ -531,6 +607,17 @@ String DAGExpressionAnalyzer::alignReturnType( return updated_name; } +String DAGExpressionAnalyzer::appendCast(const DataTypePtr & target_type, ExpressionActionsPtr & actions, const String & expr_name) +{ + // need to add cast function + // first construct the second argument + tipb::Expr type_expr; + constructStringLiteralTiExpr(type_expr, target_type->getName()); + auto type_expr_name = getActions(type_expr, actions); + String cast_expr_name = applyFunction("CAST", {expr_name, type_expr_name}, actions, nullptr); + return cast_expr_name; +} + String DAGExpressionAnalyzer::appendCastIfNeeded( const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name, bool explicit_cast) { @@ -551,13 +638,7 @@ String DAGExpressionAnalyzer::appendCastIfNeeded( { implicit_cast_count += !explicit_cast; - // need to add cast function - // first construct the second argument - tipb::Expr type_expr; - constructStringLiteralTiExpr(type_expr, expected_type->getName()); - auto type_expr_name = getActions(type_expr, actions); - String cast_expr_name = applyFunction("CAST", {expr_name, type_expr_name}, actions, nullptr); - return cast_expr_name; + return appendCast(expected_type, actions, expr_name); } else { diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index ab7ecb6ef7e..37d7949ec86 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -10,6 +10,7 @@ #include #include #include +#include #include namespace DB @@ -44,6 +45,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable AggregateDescriptions & aggregate_descriptions); void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg, bool keep_session_timezone_info); String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name, bool explicit_cast); + String appendCast(const DataTypePtr & target_type, ExpressionActionsPtr & actions, const String & expr_name); String alignReturnType(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name, bool force_uint8); void initChain(ExpressionActionsChain & chain, const std::vector & columns) const { @@ -58,6 +60,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable chain.steps.emplace_back(std::make_shared(column_list, settings)); } } + void appendJoin(ExpressionActionsChain & chain, SubqueryForSet & join_query, const NamesAndTypesList & columns_added_by_join); void appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project); String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions, bool output_as_uint8_type = false); const std::vector & getCurrentInputColumns(); @@ -66,7 +69,10 @@ class DAGExpressionAnalyzer : private boost::noncopyable String applyFunction( const String & func_name, const Names & arg_names, ExpressionActionsPtr & actions, std::shared_ptr collator); Int32 getImplicitCastCount() { return implicit_cast_count; }; - bool appendTimeZoneCastsAfterTS(ExpressionActionsChain & chain, std::vector is_ts_column); + bool appendTimeZoneCastsAfterTS( + ExpressionActionsChain & chain, std::vector is_ts_column, bool keep_UTC_column); + bool appendJoinKey( + ExpressionActionsChain & chain, const tipb::Join & join, const DataTypes & key_types, Names & key_names, bool tiflash_left); String appendTimeZoneCast(const String & tz_col, const String & ts_col, const String & func_name, ExpressionActionsPtr & actions); DAGPreparedSets & getPreparedSets() { return prepared_sets; } String convertToUInt8(ExpressionActionsPtr & actions, const String & column_name); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp new file mode 100644 index 00000000000..2231a8a34c5 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp @@ -0,0 +1,200 @@ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#pragma GCC diagnostic pop + +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int NOT_IMPLEMENTED; +extern const int LOGICAL_ERROR; +extern const int COP_BAD_DAG_REQUEST; +} // namespace ErrorCodes + +class Context; +class TiFlashMetrics; +using TiFlashMetricsPtr = std::shared_ptr; + +bool isSourceNode(const tipb::Executor * root) +{ + return root->tp() == tipb::ExecType::TypeJoin || root->tp() == tipb::ExecType::TypeTableScan; +} + +const static String SOURCE_NAME("source"); +const static String SEL_NAME("selection"); +const static String AGG_NAME("aggregation"); +const static String TOPN_NAME("topN"); +const static String LIMIT_NAME("limit"); + +static void assignOrThrowException(const tipb::Executor ** to, const tipb::Executor * from, const String & name) +{ + if (*to != nullptr) + { + throw Exception("Duplicated " + name + " in DAG request"); + } + *to = from; +} + +void collectOutPutFieldTypesFromAgg(std::vector & field_type, const tipb::Aggregation & agg) +{ + for (auto & expr : agg.agg_func()) + { + if (!exprHasValidFieldType(expr)) + { + throw Exception("Agg expression without valid field type", ErrorCodes::COP_BAD_DAG_REQUEST); + } + field_type.push_back(expr.field_type()); + } + for (auto & expr : agg.group_by()) + { + if (!exprHasValidFieldType(expr)) + { + throw Exception("Group by expression without valid field type", ErrorCodes::COP_BAD_DAG_REQUEST); + } + field_type.push_back(expr.field_type()); + } +} + +/// construct DAGQueryBlock from a tree struct based executors, which is the +/// format after supporting join in dag request +DAGQueryBlock::DAGQueryBlock(UInt32 id_, const tipb::Executor & root_) + : id(id_), root(&root_), qb_column_prefix("__QB_" + std::to_string(id_) + "_"), qb_join_subquery_alias(qb_column_prefix + "join") +{ + const tipb::Executor * current = root; + while (!isSourceNode(current) && current->has_executor_id()) + { + switch (current->tp()) + { + case tipb::ExecType::TypeSelection: + assignOrThrowException(&selection, current, SEL_NAME); + selection_name = current->executor_id(); + current = ¤t->selection().child(); + break; + case tipb::ExecType::TypeAggregation: + case tipb::ExecType::TypeStreamAgg: + assignOrThrowException(&aggregation, current, AGG_NAME); + aggregation_name = current->executor_id(); + collectOutPutFieldTypesFromAgg(output_field_types, current->aggregation()); + current = ¤t->aggregation().child(); + break; + case tipb::ExecType::TypeLimit: + assignOrThrowException(&limitOrTopN, current, LIMIT_NAME); + limitOrTopN_name = current->executor_id(); + current = ¤t->limit().child(); + break; + case tipb::ExecType::TypeTopN: + assignOrThrowException(&limitOrTopN, current, TOPN_NAME); + limitOrTopN_name = current->executor_id(); + current = ¤t->topn().child(); + break; + case tipb::ExecType::TypeIndexScan: + throw Exception("Unsupported executor in DAG request: " + current->DebugString(), ErrorCodes::NOT_IMPLEMENTED); + default: + throw Exception("Should not reach here", ErrorCodes::LOGICAL_ERROR); + } + } + + if (!current->has_executor_id()) + throw Exception("Tree struct based executor must have executor id", ErrorCodes::COP_BAD_DAG_REQUEST); + + assignOrThrowException(&source, current, SOURCE_NAME); + source_name = current->executor_id(); + if (current->tp() == tipb::ExecType::TypeJoin) + { + if (source->join().children_size() != 2) + throw Exception("Join executor children size not equal to 2", ErrorCodes::COP_BAD_DAG_REQUEST); + children.push_back(std::make_shared(id * 2, source->join().children(0))); + children.push_back(std::make_shared(id * 2 + 1, source->join().children(1))); + } + fillOutputFieldTypes(); +} + +/// construct DAGQueryBlock from a list struct based executors, which is the +/// format before supporting join in dag request +DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrField & executors) + : id(id_), root(nullptr), qb_column_prefix("__QB_" + std::to_string(id_) + "_"), qb_join_subquery_alias(qb_column_prefix + "join") +{ + for (int i = (int)executors.size() - 1; i >= 0; i--) + { + switch (executors[i].tp()) + { + case tipb::ExecType::TypeTableScan: + assignOrThrowException(&source, &executors[i], SOURCE_NAME); + /// use index as the prefix for executor name so when we sort by + /// the executor name, it will result in the same order as it is + /// in the dag_request, this is needed when filling executeSummary + /// in DAGDriver + source_name = std::to_string(i) + "_tablescan"; + break; + case tipb::ExecType::TypeSelection: + assignOrThrowException(&selection, &executors[i], SEL_NAME); + selection_name = std::to_string(i) + "_selection"; + break; + case tipb::ExecType::TypeStreamAgg: + case tipb::ExecType::TypeAggregation: + assignOrThrowException(&aggregation, &executors[i], AGG_NAME); + aggregation_name = std::to_string(i) + "_aggregation"; + collectOutPutFieldTypesFromAgg(output_field_types, executors[i].aggregation()); + break; + case tipb::ExecType::TypeTopN: + assignOrThrowException(&limitOrTopN, &executors[i], TOPN_NAME); + limitOrTopN_name = std::to_string(i) + "_limitOrTopN"; + break; + case tipb::ExecType::TypeLimit: + assignOrThrowException(&limitOrTopN, &executors[i], LIMIT_NAME); + limitOrTopN_name = std::to_string(i) + "_limitOrTopN"; + break; + default: + throw Exception("Unsupported executor in DAG request: " + executors[i].DebugString(), ErrorCodes::NOT_IMPLEMENTED); + } + } + fillOutputFieldTypes(); +} + +void DAGQueryBlock::fillOutputFieldTypes() +{ + if (source->tp() == tipb::ExecType::TypeJoin) + { + if (output_field_types.empty()) + { + for (auto & field_type : children[0]->output_field_types) + output_field_types.push_back(field_type); + for (auto & field_type : children[1]->output_field_types) + output_field_types.push_back(field_type); + } + } + else + { + if (output_field_types.empty()) + { + for (auto & ci : source->tbl_scan().columns()) + { + tipb::FieldType field_type; + field_type.set_tp(ci.tp()); + field_type.set_flag(ci.flag()); + field_type.set_flen(ci.columnlen()); + field_type.set_decimal(ci.decimal()); + output_field_types.push_back(field_type); + } + } + } +} + +void DAGQueryBlock::collectAllPossibleChildrenJoinSubqueryAlias(std::unordered_map> & result) +{ + std::vector all_qb_join_subquery_alias; + for (auto & child : children) + { + child->collectAllPossibleChildrenJoinSubqueryAlias(result); + all_qb_join_subquery_alias.insert(all_qb_join_subquery_alias.end(), result[child->id].begin(), result[child->id].end()); + } + all_qb_join_subquery_alias.push_back(qb_join_subquery_alias); + result[id] = all_qb_join_subquery_alias; +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlock.h b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h new file mode 100644 index 00000000000..e8d7a276f08 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h @@ -0,0 +1,58 @@ +#pragma once + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#pragma GCC diagnostic pop + +#include +#include +#include +#include +#include + +namespace DB +{ + +class Context; +class TiFlashMetrics; +using TiFlashMetricsPtr = std::shared_ptr; + +/// DAGQueryBlock is a dag query from single source, +/// which means the query block contains a source node(tablescan or join) +/// and some of the optional node.(selection/aggregation/project/limit/topN) +class DAGQueryBlock +{ +public: + DAGQueryBlock(UInt32 id, const tipb::Executor & root); + DAGQueryBlock(UInt32 id, const ::google::protobuf::RepeatedPtrField & executors); + /// the xxx_name is added for compatibility issues: before join is supported, executor does not + /// has executor name, after join is supported in dag request, every executor has an unique + /// name(executor->executor_id()). Since We can not always get the executor name from executor + /// itself, we had to add xxx_name here + const tipb::Executor * source = nullptr; + String source_name; + const tipb::Executor * selection = nullptr; + String selection_name; + const tipb::Executor * aggregation = nullptr; + String aggregation_name; + const tipb::Executor * limitOrTopN = nullptr; + String limitOrTopN_name; + UInt32 id; + const tipb::Executor * root; + String qb_column_prefix; + String qb_join_subquery_alias; + std::vector> children; + std::vector output_field_types; + // kinds of project + std::vector output_offsets; + void fillOutputFieldTypes(); + void collectAllPossibleChildrenJoinSubqueryAlias(std::unordered_map> & result); + bool isRootQueryBlock() const { return id == 1; }; + bool isRemoteQuery() const + { + return source->tp() == tipb::ExecType::TypeTableScan && source->tbl_scan().next_read_engine() != tipb::EngineType::Local; + } +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp new file mode 100644 index 00000000000..11d44e25218 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -0,0 +1,1357 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "InterpreterDAGHelper.hpp" + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int UNKNOWN_TABLE; +extern const int TOO_MANY_COLUMNS; +extern const int SCHEMA_VERSION_ERROR; +extern const int UNKNOWN_EXCEPTION; +extern const int COP_BAD_DAG_REQUEST; +extern const int NO_COMMON_TYPE; +} // namespace ErrorCodes + +DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(Context & context_, const std::vector & input_streams_vec_, + const DAGQueryBlock & query_block_, bool keep_session_timezone_info_, const tipb::DAGRequest & rqst_, ASTPtr dummy_query_, + const DAGQuerySource & dag_, std::vector & subqueriesForSets_) + : context(context_), + input_streams_vec(input_streams_vec_), + query_block(query_block_), + keep_session_timezone_info(keep_session_timezone_info_), + rqst(rqst_), + dummy_query(std::move(dummy_query_)), + dag(dag_), + subqueriesForSets(subqueriesForSets_), + log(&Logger::get("DAGQueryBlockInterpreter")) +{ + if (query_block.selection != nullptr) + { + for (auto & condition : query_block.selection->selection().conditions()) + conditions.push_back(&condition); + } + const Settings & settings = context.getSettingsRef(); + max_streams = settings.max_threads; + if (max_streams > 1) + { + max_streams *= settings.max_streams_to_max_threads_ratio; + } +} + +template +void constructHandleColRefExpr(tipb::Expr & expr, Int64 col_index) +{ + expr.set_tp(tipb::ExprType::ColumnRef); + std::stringstream ss; + encodeDAGInt64(col_index, ss); + expr.set_val(ss.str()); + auto * field_type = expr.mutable_field_type(); + field_type->set_tp(TiDB::TypeLongLong); + // handle col is always not null + field_type->set_flag(TiDB::ColumnFlagNotNull); + if constexpr (!std::is_signed_v) + { + field_type->set_flag(TiDB::ColumnFlagUnsigned); + } +} + +template +void constructIntLiteralExpr(tipb::Expr & expr, HandleType value) +{ + constexpr bool is_signed = std::is_signed_v; + if constexpr (is_signed) + { + expr.set_tp(tipb::ExprType::Int64); + } + else + { + expr.set_tp(tipb::ExprType::Uint64); + } + std::stringstream ss; + if constexpr (is_signed) + { + encodeDAGInt64(value, ss); + } + else + { + encodeDAGUInt64(value, ss); + } + expr.set_val(ss.str()); + auto * field_type = expr.mutable_field_type(); + field_type->set_tp(TiDB::TypeLongLong); + field_type->set_flag(TiDB::ColumnFlagNotNull); + if constexpr (!is_signed) + { + field_type->set_flag(TiDB::ColumnFlagUnsigned); + } +} + +template +void constructBoundExpr(Int32 handle_col_id, tipb::Expr & expr, TiKVHandle::Handle bound, bool is_left_bound) +{ + auto * left = expr.add_children(); + constructHandleColRefExpr(*left, handle_col_id); + auto * right = expr.add_children(); + constructIntLiteralExpr(*right, bound.handle_id); + expr.set_tp(tipb::ExprType::ScalarFunc); + if (is_left_bound) + expr.set_sig(tipb::ScalarFuncSig::GEInt); + else + expr.set_sig(tipb::ScalarFuncSig::LTInt); + auto * field_type = expr.mutable_field_type(); + field_type->set_tp(TiDB::TypeLongLong); + field_type->set_flag(TiDB::ColumnFlagNotNull); + field_type->set_flag(TiDB::ColumnFlagUnsigned); +} + +template +void constructExprBasedOnRange(Int32 handle_col_id, tipb::Expr & expr, HandleRange range) +{ + if (range.second == TiKVHandle::Handle::max) + { + constructBoundExpr(handle_col_id, expr, range.first, true); + } + else + { + auto * left = expr.add_children(); + constructBoundExpr(handle_col_id, *left, range.first, true); + auto * right = expr.add_children(); + constructBoundExpr(handle_col_id, *right, range.second, false); + expr.set_tp(tipb::ExprType::ScalarFunc); + expr.set_sig(tipb::ScalarFuncSig::LogicalAnd); + auto * field_type = expr.mutable_field_type(); + field_type->set_tp(TiDB::TypeLongLong); + field_type->set_flag(TiDB::ColumnFlagNotNull); + field_type->set_flag(TiDB::ColumnFlagUnsigned); + } +} + +template +bool checkRangeAndGenExprIfNeeded(std::vector> & ranges, const std::vector> & region_ranges, + Int32 handle_col_id, tipb::Expr & handle_filter, Logger * log) +{ + if (ranges.empty()) + { + // generate an always false filter + LOG_WARNING(log, "income key ranges is empty"); + constructInt64LiteralTiExpr(handle_filter, 0); + return false; + } + std::sort(ranges.begin(), ranges.end(), + [](const HandleRange & a, const HandleRange & b) { return a.first < b.first; }); + + std::vector> merged_ranges; + HandleRange merged_range; + merged_range.first = ranges[0].first; + merged_range.second = ranges[0].second; + + for (size_t i = 1; i < ranges.size(); i++) + { + if (merged_range.second >= ranges[i].first) + merged_range.second = merged_range.second >= ranges[i].second ? merged_range.second : ranges[i].second; + else + { + if (merged_range.second > merged_range.first) + merged_ranges.emplace_back(std::make_pair(merged_range.first, merged_range.second)); + merged_range.first = ranges[i].first; + merged_range.second = ranges[i].second; + } + } + if (merged_range.second > merged_range.first) + merged_ranges.emplace_back(std::make_pair(merged_range.first, merged_range.second)); + + bool ret = true; + for (const auto & region_range : region_ranges) + { + bool covered = false; + for (const auto & range : merged_ranges) + { + if (region_range.first >= range.first && region_range.second <= range.second) + { + covered = true; + break; + } + } + if (!covered && region_range.second > region_range.first) + { + ret = false; + break; + } + } + LOG_DEBUG(log, "ret " << ret); + if (!ret) + { + if (merged_ranges.empty()) + { + constructInt64LiteralTiExpr(handle_filter, 0); + } + else if (merged_ranges.size() == 1) + { + constructExprBasedOnRange(handle_col_id, handle_filter, merged_ranges[0]); + } + else + { + for (const auto & range : merged_ranges) + { + auto * filter = handle_filter.add_children(); + constructExprBasedOnRange(handle_col_id, *filter, range); + } + handle_filter.set_tp(tipb::ExprType::ScalarFunc); + handle_filter.set_sig(tipb::ScalarFuncSig::LogicalOr); + auto * field_type = handle_filter.mutable_field_type(); + field_type->set_tp(TiDB::TypeLongLong); + field_type->set_flag(TiDB::ColumnFlagNotNull); + field_type->set_flag(TiDB::ColumnFlagUnsigned); + } + } + return ret; +} + +bool checkKeyRanges(const std::vector> & key_ranges, TableID table_id, bool pk_is_uint64, + const ImutRegionRangePtr & region_key_range, Int32 handle_col_id, tipb::Expr & handle_filter, Logger * log) +{ + LOG_INFO(log, "pk_is_uint: " << pk_is_uint64); + if (key_ranges.empty()) + { + LOG_WARNING(log, "income key ranges is empty1"); + constructInt64LiteralTiExpr(handle_filter, 0); + return false; + } + + std::vector> handle_ranges; + for (auto & range : key_ranges) + { + TiKVRange::Handle start = TiKVRange::getRangeHandle(range.first, table_id); + TiKVRange::Handle end = TiKVRange::getRangeHandle(range.second, table_id); + handle_ranges.emplace_back(std::make_pair(start, end)); + } + + std::vector> region_handle_ranges; + auto & raw_keys = region_key_range->rawKeys(); + TiKVRange::Handle region_start = TiKVRange::getRangeHandle(raw_keys.first, table_id); + TiKVRange::Handle region_end = TiKVRange::getRangeHandle(raw_keys.second, table_id); + region_handle_ranges.emplace_back(std::make_pair(region_start, region_end)); + + if (pk_is_uint64) + { + std::vector> update_handle_ranges; + for (auto & range : handle_ranges) + { + const auto [n, new_range] = CHTableHandle::splitForUInt64TableHandle(range); + + for (int i = 0; i < n; i++) + { + update_handle_ranges.emplace_back(new_range[i]); + } + } + std::vector> update_region_handle_ranges; + for (auto & range : region_handle_ranges) + { + const auto [n, new_range] = CHTableHandle::splitForUInt64TableHandle(range); + + for (int i = 0; i < n; i++) + { + update_region_handle_ranges.emplace_back(new_range[i]); + } + } + return checkRangeAndGenExprIfNeeded(update_handle_ranges, update_region_handle_ranges, handle_col_id, handle_filter, log); + } + else + return checkRangeAndGenExprIfNeeded(handle_ranges, region_handle_ranges, handle_col_id, handle_filter, log); +} + +// the flow is the same as executeFetchcolumns +void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) +{ + if (!ts.has_table_id()) + { + // do not have table id + throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST); + } + TableID table_id = ts.table_id(); + + const Settings & settings = context.getSettingsRef(); + auto & tmt = context.getTMTContext(); + + if (settings.schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION) + { + storage = context.getTMTContext().getStorages().get(table_id); + if (storage == nullptr) + { + throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); + } + table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + } + else + { + getAndLockStorageWithSchemaVersion(table_id, settings.schema_version); + } + + Names required_columns; + std::vector source_columns; + std::vector is_ts_column; + String handle_column_name = MutableSupport::tidb_pk_column_name; + if (auto pk_handle_col = storage->getTableInfo().getPKHandleColumn()) + handle_column_name = pk_handle_col->get().name; + + for (Int32 i = 0; i < ts.columns().size(); i++) + { + auto const & ci = ts.columns(i); + ColumnID cid = ci.column_id(); + + if (cid == -1) + { + // Column ID -1 return the handle column + required_columns.push_back(handle_column_name); + auto pair = storage->getColumns().getPhysical(handle_column_name); + source_columns.push_back(pair); + is_ts_column.push_back(false); + continue; + } + + String name = storage->getTableInfo().getColumnName(cid); + required_columns.push_back(name); + auto pair = storage->getColumns().getPhysical(name); + source_columns.emplace_back(std::move(pair)); + is_ts_column.push_back(ci.tp() == TiDB::TypeTimestamp); + } + + analyzer = std::make_unique(std::move(source_columns), context); + + if (query_block.aggregation == nullptr) + { + if (query_block.isRootQueryBlock()) + { + for (auto i : query_block.output_offsets) + { + if ((size_t)i >= required_columns.size()) + { + // array index out of bound + throw Exception("Output offset index is out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); + } + // do not have alias + final_project.emplace_back(required_columns[i], ""); + } + } + else + { + for (size_t i = 0; i < required_columns.size(); i++) + /// for child query block, the final project is all the columns read from + /// the table and add alias start with qb_column_prefix to avoid column name conflict + final_project.emplace_back(required_columns[i], query_block.qb_column_prefix + required_columns[i]); + } + } + // todo handle alias column + if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) + { + throw Exception("Limit for number of columns to read exceeded. " + "Requested: " + + toString(required_columns.size()) + ", maximum: " + settings.max_columns_to_read.toString(), + ErrorCodes::TOO_MANY_COLUMNS); + } + + size_t max_block_size = settings.max_block_size; + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + + if (query_block.selection) + { + for (auto & condition : query_block.selection->selection().conditions()) + { + analyzer->makeExplicitSetForIndex(condition, storage); + } + } + + SelectQueryInfo query_info; + /// to avoid null point exception + query_info.query = dummy_query; + query_info.dag_query = std::make_unique(conditions, analyzer->getPreparedSets(), analyzer->getCurrentInputColumns()); + query_info.mvcc_query_info = std::make_unique(); + query_info.mvcc_query_info->resolve_locks = true; + query_info.mvcc_query_info->read_tso = settings.read_tso; + + if (dag.getRegions().empty()) + { + throw Exception("Dag Request does not have region to read. ", ErrorCodes::COP_BAD_DAG_REQUEST); + } + + if (!dag.isBatchCop()) + { + if (auto [info_retry, status] = MakeRegionQueryInfos(dag.getRegions(), {}, tmt, *query_info.mvcc_query_info, table_id); info_retry) + throw RegionException({(*info_retry).begin()->first}, status); + + try + { + pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); + } + catch (DB::Exception & e) + { + e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() + + "`, table_id: " + DB::toString(table_id) + ")"); + throw; + } + } + else + { + std::unordered_map region_retry; + std::unordered_set force_retry; + for (;;) + { + try + { + region_retry.clear(); + auto [retry, status] = MakeRegionQueryInfos(dag.getRegions(), force_retry, tmt, *query_info.mvcc_query_info, table_id); + std::ignore = status; + if (retry) + { + region_retry = std::move(*retry); + for (auto & r : region_retry) + force_retry.emplace(r.first); + } + if (query_info.mvcc_query_info->regions_query_info.empty()) + break; + pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); + break; + } + catch (const LockException & e) + { + // We can also use current thread to resolve lock, but it will block next process. + // So, force this region retry in another thread in CoprocessorBlockInputStream. + force_retry.emplace(e.region_id); + } + catch (const RegionException & e) + { + if (tmt.getTerminated()) + throw Exception("TiFlash server is terminating", ErrorCodes::LOGICAL_ERROR); + // By now, RegionException will contain all region id of MvccQueryInfo, which is needed by CHSpark. + // When meeting RegionException, we can let MakeRegionQueryInfos to check in next loop. + } + catch (DB::Exception & e) + { + e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() + + "`, table_id: " + DB::toString(table_id) + ")"); + throw; + } + } + + if (region_retry.size()) + { + LOG_DEBUG(log, ({ + std::stringstream ss; + ss << "Start to retry " << region_retry.size() << " regions ("; + for (auto & r : region_retry) + ss << r.first << ","; + ss << ")"; + ss.str(); + })); + + DAGSchema schema; + ::tipb::DAGRequest dag_req; + + { + const auto & table_info = storage->getTableInfo(); + tipb::Executor * ts_exec = dag_req.add_executors(); + ts_exec->set_tp(tipb::ExecType::TypeTableScan); + *(ts_exec->mutable_tbl_scan()) = ts; + + for (int i = 0; i < ts.columns().size(); ++i) + { + const auto & col = ts.columns(i); + auto col_id = col.column_id(); + + if (col_id == DB::TiDBPkColumnID) + { + ColumnInfo ci; + ci.tp = TiDB::TypeLongLong; + ci.setPriKeyFlag(); + ci.setNotNullFlag(); + schema.emplace_back(std::make_pair(handle_column_name, std::move(ci))); + } + else + { + auto & col_info = table_info.getColumnInfo(col_id); + schema.emplace_back(std::make_pair(col_info.name, col_info)); + } + dag_req.add_output_offsets(i); + } + dag_req.set_encode_type(tipb::EncodeType::TypeCHBlock); + } + + std::vector ranges; + for (auto & info : region_retry) + { + for (auto & range : info.second.key_ranges) + ranges.emplace_back(range.first, range.second); + } + sort(ranges.begin(), ranges.end()); + executeRemoteQueryImpl(pipeline, ranges, dag_req, schema); + } + } + + if (pipeline.streams.empty()) + { + pipeline.streams.emplace_back(std::make_shared(storage->getSampleBlockForColumns(required_columns))); + } + + pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); }); + + /// Set the limits and quota for reading data, the speed and time of the query. + { + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; + limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode); + limits.max_execution_time = settings.max_execution_time; + limits.timeout_overflow_mode = settings.timeout_overflow_mode; + + /** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers, + * because the initiating server has a summary of the execution of the request on all servers. + * + * But limits on data size to read and maximum execution time are reasonable to check both on initiator and + * additionally on each remote server, because these limits are checked per block of data processed, + * and remote servers may process way more blocks of data than are received by initiator. + */ + limits.min_execution_speed = settings.min_execution_speed; + limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed; + + QuotaForIntervals & quota = context.getQuota(); + + pipeline.transform([&](auto & stream) { + if (IProfilingBlockInputStream * p_stream = dynamic_cast(stream.get())) + { + p_stream->setLimits(limits); + p_stream->setQuota(quota); + } + }); + } + + if (addTimeZoneCastAfterTS(is_ts_column, pipeline) && !query_block.aggregation + && (keep_session_timezone_info || !query_block.isRootQueryBlock())) + { + if (query_block.isRootQueryBlock()) + { + for (size_t i = 0; i < query_block.output_offsets.size(); i++) + { + int column_index = query_block.output_offsets[i]; + if (is_ts_column[column_index]) + final_project[i].first = analyzer->getCurrentInputColumns()[column_index].name; + } + } + else + { + for (size_t i = 0; i < final_project.size(); i++) + { + if (is_ts_column[i]) + final_project[i].first = analyzer->getCurrentInputColumns()[i].name; + } + } + } +} + +void DAGQueryBlockInterpreter::prepareJoinKeys( + const tipb::Join & join, const DataTypes & key_types, Pipeline & pipeline, Names & key_names, bool tiflash_left) +{ + std::vector source_columns; + for (auto const & p : pipeline.firstStream()->getHeader().getNamesAndTypesList()) + source_columns.emplace_back(p.name, p.type); + DAGExpressionAnalyzer dag_analyzer(std::move(source_columns), context); + ExpressionActionsChain chain; + if (dag_analyzer.appendJoinKey(chain, join, key_types, key_names, tiflash_left)) + { + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, chain.getLastActions()); }); + } +} + +/// ClickHouse require join key to be exactly the same type +/// TiDB only require the join key to be the same category +/// for example decimal(10,2) join decimal(20,0) is allowed in +/// TiDB and will throw exception in ClickHouse +void getJoinKeyTypes(const tipb::Join & join, DataTypes & key_types) +{ + for (int i = 0; i < join.left_join_keys().size(); i++) + { + if (!exprHasValidFieldType(join.left_join_keys(i)) || !exprHasValidFieldType(join.right_join_keys(i))) + throw Exception("Join key without field type", ErrorCodes::COP_BAD_DAG_REQUEST); + DataTypes types; + types.emplace_back(getDataTypeByFieldType(join.left_join_keys(i).field_type())); + types.emplace_back(getDataTypeByFieldType(join.right_join_keys(i).field_type())); + try + { + DataTypePtr common_type = getLeastSupertype(types); + key_types.emplace_back(common_type); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::NO_COMMON_TYPE) + { + DataTypePtr left_type = removeNullable(types[0]); + DataTypePtr right_type = removeNullable(types[1]); + if ((left_type->getTypeId() == TypeIndex::UInt64 && right_type->isInteger() && !right_type->isUnsignedInteger()) + || (right_type->getTypeId() == TypeIndex::UInt64 && left_type->isInteger() && !left_type->isUnsignedInteger())) + { + /// special case for uint64 and int + /// inorder to not throw exception, use Decimal(20, 0) as the common type + DataTypePtr common_type = std::make_shared>(20, 0); + if (types[0]->isNullable() || types[1]->isNullable()) + common_type = makeNullable(common_type); + key_types.emplace_back(common_type); + } + else + throw; + } + else + { + throw; + } + } + } +} + +void DAGQueryBlockInterpreter::executeJoin(const tipb::Join & join, Pipeline & pipeline, SubqueryForSet & right_query) +{ + // build + static const std::unordered_map join_type_map{ + {tipb::JoinType::TypeInnerJoin, ASTTableJoin::Kind::Inner}, {tipb::JoinType::TypeLeftOuterJoin, ASTTableJoin::Kind::Left}, + {tipb::JoinType::TypeRightOuterJoin, ASTTableJoin::Kind::Right}}; + if (input_streams_vec.size() != 2) + { + throw Exception("Join query block must have 2 input streams", ErrorCodes::LOGICAL_ERROR); + } + + auto join_type_it = join_type_map.find(join.join_type()); + if (join_type_it == join_type_map.end()) + throw Exception("Unknown join type in dag request", ErrorCodes::COP_BAD_DAG_REQUEST); + ASTTableJoin::Kind kind = join_type_it->second; + + BlockInputStreams left_streams; + BlockInputStreams right_streams; + Names left_key_names; + Names right_key_names; + if (join.inner_idx() == 0) + { + // in DAG request, inner part is the build side, however for tiflash implementation, + // the build side must be the right side, so need to update the join type if needed + if (kind == ASTTableJoin::Kind::Left) + kind = ASTTableJoin::Kind::Right; + else if (kind == ASTTableJoin::Kind::Right) + kind = ASTTableJoin::Kind::Left; + left_streams = input_streams_vec[1]; + right_streams = input_streams_vec[0]; + } + else + { + left_streams = input_streams_vec[0]; + right_streams = input_streams_vec[1]; + } + + if (kind != ASTTableJoin::Kind::Inner) + { + // todo support left and right join + throw Exception("Only Inner join is supported", ErrorCodes::NOT_IMPLEMENTED); + } + + std::vector join_output_columns; + for (auto const & p : input_streams_vec[0][0]->getHeader().getNamesAndTypesList()) + { + join_output_columns.emplace_back(p.name, p.type); + } + for (auto const & p : input_streams_vec[1][0]->getHeader().getNamesAndTypesList()) + { + join_output_columns.emplace_back(p.name, p.type); + } + /// all the columns from right table should be added after join, even for the join key + NamesAndTypesList columns_added_by_join; + for (auto const & p : right_streams[0]->getHeader().getNamesAndTypesList()) + { + columns_added_by_join.emplace_back(p.name, p.type); + } + + if (!query_block.aggregation) + { + for (auto const & p : input_streams_vec[0][0]->getHeader().getNamesAndTypesList()) + final_project.emplace_back(p.name, query_block.qb_column_prefix + p.name); + for (auto const & p : input_streams_vec[1][0]->getHeader().getNamesAndTypesList()) + final_project.emplace_back(p.name, query_block.qb_column_prefix + p.name); + } + + DataTypes join_key_types; + getJoinKeyTypes(join, join_key_types); + + /// add necessary transformation if the join key is an expression + Pipeline left_pipeline; + left_pipeline.streams = left_streams; + prepareJoinKeys(join, join_key_types, left_pipeline, left_key_names, true); + Pipeline right_pipeline; + right_pipeline.streams = right_streams; + prepareJoinKeys(join, join_key_types, right_pipeline, right_key_names, false); + + left_streams = left_pipeline.streams; + right_streams = right_pipeline.streams; + + const Settings & settings = context.getSettingsRef(); + JoinPtr joinPtr = std::make_shared(left_key_names, right_key_names, true, + SizeLimits(settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode), kind, + ASTTableJoin::Strictness::All); + executeUnion(right_pipeline); + right_query.source = right_pipeline.firstStream(); + right_query.join = joinPtr; + right_query.join->setSampleBlock(right_query.source->getHeader()); + dag.getDAGContext().profile_streams_map_for_join_build_side[query_block.qb_join_subquery_alias].push_back(right_query.source); + + std::vector source_columns; + for (const auto & p : left_streams[0]->getHeader().getNamesAndTypesList()) + source_columns.emplace_back(p.name, p.type); + DAGExpressionAnalyzer dag_analyzer(std::move(source_columns), context); + ExpressionActionsChain chain; + dag_analyzer.appendJoin(chain, right_query, columns_added_by_join); + pipeline.streams = left_streams; + /// add join input stream + for (auto & stream : pipeline.streams) + stream = std::make_shared(stream, chain.getLastActions()); + + // todo should add a project here??? + analyzer = std::make_unique(std::move(join_output_columns), context); +} + +// add timezone cast for timestamp type, this is used to support session level timezone +bool DAGQueryBlockInterpreter::addTimeZoneCastAfterTS(std::vector & is_ts_column, Pipeline & pipeline) +{ + bool hasTSColumn = false; + for (auto b : is_ts_column) + hasTSColumn |= b; + if (!hasTSColumn) + return false; + + ExpressionActionsChain chain; + /// only keep UTC column if + /// 1. the query block is the root query block + /// 2. keep_session_timezone_info is false + /// 3. current query block does not have aggregation + if (analyzer->appendTimeZoneCastsAfterTS( + chain, is_ts_column, query_block.isRootQueryBlock() && !keep_session_timezone_info && query_block.aggregation == nullptr)) + { + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, chain.getLastActions()); }); + return true; + } + else + return false; +} + +AnalysisResult DAGQueryBlockInterpreter::analyzeExpressions() +{ + AnalysisResult res; + ExpressionActionsChain chain; + if (!conditions.empty()) + { + analyzer->appendWhere(chain, conditions, res.filter_column_name); + res.has_where = true; + res.before_where = chain.getLastActions(); + chain.addStep(); + } + // There will be either Agg... + if (query_block.aggregation) + { + analyzer->appendAggregation(chain, query_block.aggregation->aggregation(), res.aggregation_keys, res.aggregate_descriptions); + res.need_aggregate = true; + res.before_aggregation = chain.getLastActions(); + + chain.finalize(); + chain.clear(); + + // add cast if type is not match + analyzer->appendAggSelect( + chain, query_block.aggregation->aggregation(), keep_session_timezone_info || !query_block.isRootQueryBlock()); + if (query_block.isRootQueryBlock()) + { + // todo for root query block, use output offsets to reconstruct the final project + for (auto & element : analyzer->getCurrentInputColumns()) + { + final_project.emplace_back(element.name, ""); + } + } + else + { + for (auto & element : analyzer->getCurrentInputColumns()) + { + final_project.emplace_back(element.name, query_block.qb_column_prefix + element.name); + } + } + } + // Or TopN, not both. + if (query_block.limitOrTopN && query_block.limitOrTopN->tp() == tipb::ExecType::TypeTopN) + { + res.has_order_by = true; + analyzer->appendOrderBy(chain, query_block.limitOrTopN->topn(), res.order_columns); + } + // Append final project results if needed. + analyzer->appendFinalProject(chain, final_project); + res.before_order_and_select = chain.getLastActions(); + chain.finalize(); + chain.clear(); + //todo need call prependProjectInput?? + return res; +} + +void DAGQueryBlockInterpreter::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expr, String & filter_column) +{ + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expr, filter_column); }); +} + +void DAGQueryBlockInterpreter::executeAggregation( + Pipeline & pipeline, const ExpressionActionsPtr & expr, Names & key_names, AggregateDescriptions & aggregates) +{ + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expr); }); + + Block header = pipeline.firstStream()->getHeader(); + ColumnNumbers keys; + for (const auto & name : key_names) + { + keys.push_back(header.getPositionByName(name)); + } + for (auto & descr : aggregates) + { + if (descr.arguments.empty()) + { + for (const auto & name : descr.argument_names) + { + descr.arguments.push_back(header.getPositionByName(name)); + } + } + } + + const Settings & settings = context.getSettingsRef(); + + /** Two-level aggregation is useful in two cases: + * 1. Parallel aggregation is done, and the results should be merged in parallel. + * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way. + */ + bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0; + + Aggregator::Params params(header, keys, aggregates, false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, + settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, + allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), + allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), + settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, context.getTemporaryPath()); + + /// If there are several sources, then we perform parallel aggregation + if (pipeline.streams.size() > 1) + { + pipeline.firstStream() = std::make_shared(pipeline.streams, nullptr, params, true, max_streams, + settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) + : static_cast(settings.max_threads)); + + pipeline.streams.resize(1); + } + else + { + pipeline.firstStream() = std::make_shared(pipeline.firstStream(), params, true); + } + // add cast +} + +void DAGQueryBlockInterpreter::executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr) +{ + if (!expressionActionsPtr->getActions().empty()) + { + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expressionActionsPtr); }); + } +} + +void DAGQueryBlockInterpreter::getAndLockStorageWithSchemaVersion(TableID table_id, Int64 query_schema_version) +{ + /// Get current schema version in schema syncer for a chance to shortcut. + auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion(); + + /// Lambda for get storage, then align schema version under the read lock. + auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple { + /// Get storage in case it's dropped then re-created. + // If schema synced, call getTable without try, leading to exception on table not existing. + auto storage_ = context.getTMTContext().getStorages().get(table_id); + if (!storage_) + { + if (schema_synced) + throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); + else + return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false); + } + + if (storage_->engineType() != ::TiDB::StorageEngine::TMT && storage_->engineType() != ::TiDB::StorageEngine::DT) + { + throw Exception("Specifying schema_version for non-managed storage: " + storage_->getName() + + ", table: " + storage_->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed", + ErrorCodes::LOGICAL_ERROR); + } + + /// Lock storage. + auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__); + + /// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema. + // We have three schema versions, two in TiFlash: + // 1. Storage: the version that this TiFlash table (storage) was last altered. + // 2. Global: the version that TiFlash global schema is at. + // And one from TiDB/TiSpark: + // 3. Query: the version that TiDB/TiSpark used for this query. + auto storage_schema_version = storage_->getTableInfo().schema_version; + // Not allow storage > query in any case, one example is time travel queries. + if (storage_schema_version > query_schema_version) + throw Exception("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version) + + " newer than query schema version " + std::to_string(query_schema_version), + ErrorCodes::SCHEMA_VERSION_ERROR); + // From now on we have storage <= query. + // If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve. + if (schema_synced) + return std::make_tuple(storage_, lock, storage_schema_version, true); + // From now on the schema was not synced. + // 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve. + // 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query, + // meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve. + if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version) + return std::make_tuple(storage_, lock, storage_schema_version, true); + // From now on we have global < query. + // Return false for outer to sync and retry. + return std::make_tuple(nullptr, nullptr, storage_schema_version, false); + }; + + /// Try get storage and lock once. + ManageableStoragePtr storage_; + TableStructureReadLockPtr lock; + Int64 storage_schema_version; + auto log_schema_version = [&](const String & result) { + LOG_DEBUG(log, + __PRETTY_FUNCTION__ << " Table " << table_id << " schema " << result << " Schema version [storage, global, query]: " + << "[" << storage_schema_version << ", " << global_schema_version << ", " << query_schema_version << "]."); + }; + bool ok; + { + std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(false); + if (ok) + { + log_schema_version("OK, no syncing required."); + storage = storage_; + table_lock = lock; + return; + } + } + + /// If first try failed, sync schema and try again. + { + log_schema_version("not OK, syncing schemas."); + auto start_time = Clock::now(); + context.getTMTContext().getSchemaSyncer()->syncSchemas(context); + auto schema_sync_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); + LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << table_id << " schema sync cost " << schema_sync_cost << "ms."); + + std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(true); + if (ok) + { + log_schema_version("OK after syncing."); + storage = storage_; + table_lock = lock; + return; + } + + throw Exception("Shouldn't reach here", ErrorCodes::UNKNOWN_EXCEPTION); + } +} + +SortDescription DAGQueryBlockInterpreter::getSortDescription(std::vector & order_columns) +{ + // construct SortDescription + SortDescription order_descr; + const tipb::TopN & topn = query_block.limitOrTopN->topn(); + order_descr.reserve(topn.order_by_size()); + for (int i = 0; i < topn.order_by_size(); i++) + { + String name = order_columns[i].name; + int direction = topn.order_by(i).desc() ? -1 : 1; + // MySQL/TiDB treats NULL as "minimum". + int nulls_direction = -1; + std::shared_ptr collator = nullptr; + if (removeNullable(order_columns[i].type)->isString()) + collator = getCollatorFromExpr(topn.order_by(i).expr()); + + order_descr.emplace_back(name, direction, nulls_direction, collator); + } + return order_descr; +} + +void DAGQueryBlockInterpreter::executeUnion(Pipeline & pipeline) +{ + if (pipeline.hasMoreThanOneStream()) + { + pipeline.firstStream() = std::make_shared>(pipeline.streams, nullptr, max_streams); + pipeline.streams.resize(1); + } +} + +void DAGQueryBlockInterpreter::executeOrder(Pipeline & pipeline, std::vector & order_columns) +{ + SortDescription order_descr = getSortDescription(order_columns); + const Settings & settings = context.getSettingsRef(); + Int64 limit = query_block.limitOrTopN->topn().limit(); + + pipeline.transform([&](auto & stream) { + auto sorting_stream = std::make_shared(stream, order_descr, limit); + + /// Limits on sorting + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; + limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); + sorting_stream->setLimits(limits); + + stream = sorting_stream; + }); + + /// If there are several streams, we merge them into one + executeUnion(pipeline); + + /// Merge the sorted blocks. + pipeline.firstStream() = std::make_shared(pipeline.firstStream(), order_descr, settings.max_block_size, + limit, settings.max_bytes_before_external_sort, context.getTemporaryPath()); +} + +void DAGQueryBlockInterpreter::recordProfileStreams(Pipeline & pipeline, const String & key) +{ + dag.getDAGContext().profile_streams_map[key].qb_id = query_block.id; + for (auto & stream : pipeline.streams) + { + dag.getDAGContext().profile_streams_map[key].input_streams.push_back(stream); + } +} + +void copyExecutorTreeWithLocalTableScan( + tipb::DAGRequest & dag_req, const tipb::Executor * root, tipb::EncodeType encode_type, const tipb::DAGRequest & org_req) +{ + const tipb::Executor * current = root; + auto * exec = dag_req.mutable_root_executor(); + int exec_id = 0; + while (current->tp() != tipb::ExecType::TypeTableScan) + { + if (current->tp() == tipb::ExecType::TypeSelection) + { + exec->set_tp(tipb::ExecType::TypeSelection); + exec->set_executor_id("selection_" + std::to_string(exec_id)); + auto * sel = exec->mutable_selection(); + for (auto const & condition : current->selection().conditions()) + { + auto * tmp = sel->add_conditions(); + tmp->CopyFrom(condition); + } + exec = sel->mutable_child(); + current = ¤t->selection().child(); + } + else if (current->tp() == tipb::ExecType::TypeAggregation || current->tp() == tipb::ExecType::TypeStreamAgg) + { + exec->set_tp(current->tp()); + exec->set_executor_id("aggregation_" + std::to_string(exec_id)); + auto * agg = exec->mutable_aggregation(); + for (auto const & expr : current->aggregation().agg_func()) + { + auto * tmp = agg->add_agg_func(); + tmp->CopyFrom(expr); + } + for (auto const & expr : current->aggregation().group_by()) + { + auto * tmp = agg->add_group_by(); + tmp->CopyFrom(expr); + } + agg->set_streamed(current->aggregation().streamed()); + exec = agg->mutable_child(); + current = ¤t->aggregation().child(); + } + else if (current->tp() == tipb::ExecType::TypeLimit) + { + exec->set_tp(current->tp()); + exec->set_executor_id("limit_" + std::to_string(exec_id)); + auto * limit = exec->mutable_limit(); + limit->set_limit(current->limit().limit()); + exec = limit->mutable_child(); + current = ¤t->limit().child(); + } + else if (current->tp() == tipb::ExecType::TypeTopN) + { + exec->set_tp(current->tp()); + exec->set_executor_id("topN_" + std::to_string(exec_id)); + auto * topn = exec->mutable_topn(); + topn->set_limit(current->topn().limit()); + for (auto const & expr : current->topn().order_by()) + { + auto * tmp = topn->add_order_by(); + tmp->CopyFrom(expr); + } + exec = topn->mutable_child(); + current = ¤t->topn().child(); + } + else + { + throw Exception("Not supported yet"); + } + exec_id++; + } + + if (current->tp() != tipb::ExecType::TypeTableScan) + throw Exception("Only support copy from table scan sourced query block"); + exec->set_tp(tipb::ExecType::TypeTableScan); + exec->set_executor_id("tablescan_" + std::to_string(exec_id)); + auto * new_ts = new tipb::TableScan(current->tbl_scan()); + new_ts->set_next_read_engine(tipb::EngineType::Local); + exec->set_allocated_tbl_scan(new_ts); + + dag_req.set_encode_type(encode_type); + if (org_req.has_time_zone_name() && org_req.time_zone_name().length() > 0) + dag_req.set_time_zone_name(org_req.time_zone_name()); + else if (org_req.has_time_zone_offset()) + dag_req.set_time_zone_offset(org_req.time_zone_offset()); +} + +void DAGQueryBlockInterpreter::executeRemoteQuery(Pipeline & pipeline) +{ + // remote query containing agg/limit/topN can not running + // in parellel, but current remote query is running in + // parellel, so just disable this corner case. + if (query_block.aggregation || query_block.limitOrTopN) + throw Exception("Remote query containing agg or limit or topN is not supported", ErrorCodes::COP_BAD_DAG_REQUEST); + const auto & ts = query_block.source->tbl_scan(); + std::vector> key_ranges; + for (auto & range : ts.ranges()) + { + std::string start_key(range.low()); + DecodedTiKVKey start(std::move(start_key)); + std::string end_key(range.high()); + DecodedTiKVKey end(std::move(end_key)); + key_ranges.emplace_back(std::make_pair(std::move(start), std::move(end))); + } + std::vector cop_key_ranges; + cop_key_ranges.reserve(key_ranges.size()); + for (const auto & key_range : key_ranges) + { + cop_key_ranges.emplace_back(static_cast(key_range.first), static_cast(key_range.second)); + } + sort(cop_key_ranges.begin(), cop_key_ranges.end()); + + ::tipb::DAGRequest dag_req; + + tipb::EncodeType encode_type; + if (!isUnsupportedEncodeType(query_block.output_field_types, tipb::EncodeType::TypeCHBlock)) + encode_type = tipb::EncodeType::TypeCHBlock; + else if (!isUnsupportedEncodeType(query_block.output_field_types, tipb::EncodeType::TypeChunk)) + encode_type = tipb::EncodeType::TypeChunk; + else + encode_type = tipb::EncodeType::TypeDefault; + + copyExecutorTreeWithLocalTableScan(dag_req, query_block.root, encode_type, rqst); + DAGSchema schema; + ColumnsWithTypeAndName columns; + std::vector is_ts_column; + std::vector source_columns; + for (int i = 0; i < (int)query_block.output_field_types.size(); i++) + { + dag_req.add_output_offsets(i); + ColumnInfo info = fieldTypeToColumnInfo(query_block.output_field_types[i]); + String col_name = query_block.qb_column_prefix + "col_" + std::to_string(i); + schema.push_back(std::make_pair(col_name, info)); + is_ts_column.push_back(query_block.output_field_types[i].tp() == TiDB::TypeTimestamp); + source_columns.emplace_back(col_name, getDataTypeByFieldType(query_block.output_field_types[i])); + final_project.emplace_back(col_name, ""); + } + + executeRemoteQueryImpl(pipeline, cop_key_ranges, dag_req, schema); + + analyzer = std::make_unique(std::move(source_columns), context); + bool need_append_final_project = false; + if (encode_type == tipb::EncodeType::TypeDefault) + { + /// if the encode type is default, the timestamp column in dag response is UTC based + /// so need to cast the timezone + if (addTimeZoneCastAfterTS(is_ts_column, pipeline)) + { + for (size_t i = 0; i < final_project.size(); i++) + { + if (is_ts_column[i]) + final_project[i].first = analyzer->getCurrentInputColumns()[i].name; + } + need_append_final_project = true; + } + } + + /// For simplicity, remote subquery only record the CopBlockInputStream time, for example, + /// if the remote subquery is TS, then it's execute time is the execute time of CopBlockInputStream, + /// if the remote subquery is TS SEL, then both TS and SEL's execute time is the same as the CopBlockInputStream + recordProfileStreams(pipeline, query_block.source_name); + if (query_block.selection) + recordProfileStreams(pipeline, query_block.selection_name); + + if (need_append_final_project) + executeFinalProject(pipeline); +} + +void DAGQueryBlockInterpreter::executeRemoteQueryImpl(Pipeline & pipeline, + const std::vector & cop_key_ranges, ::tipb::DAGRequest & dag_req, const DAGSchema & schema) +{ + + pingcap::coprocessor::RequestPtr req = std::make_shared(); + dag_req.SerializeToString(&(req->data)); + req->tp = pingcap::coprocessor::ReqType::DAG; + req->start_ts = context.getSettingsRef().read_tso; + + pingcap::kv::Cluster * cluster = context.getTMTContext().getKVCluster(); + pingcap::kv::Backoffer bo(pingcap::kv::copBuildTaskMaxBackoff); + pingcap::kv::StoreType store_type = pingcap::kv::TiFlash; + auto all_tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, cop_key_ranges, req, store_type, &Logger::get("pingcap/coprocessor")); + + size_t concurrent_num = std::min(context.getSettingsRef().max_threads, all_tasks.size()); + size_t task_per_thread = all_tasks.size() / concurrent_num; + size_t rest_task = all_tasks.size() % concurrent_num; + for (size_t i = 0, task_start = 0; i < concurrent_num; i++) + { + size_t task_end = task_start + task_per_thread; + if (i < rest_task) + task_end++; + if (task_end == task_start) + continue; + std::vector tasks(all_tasks.begin() + task_start, all_tasks.begin() + task_end); + + BlockInputStreamPtr input = std::make_shared(cluster, tasks, schema, 1); + pipeline.streams.push_back(input); + task_start = task_end; + } +} + +void DAGQueryBlockInterpreter::executeImpl(Pipeline & pipeline) +{ + if (query_block.isRemoteQuery()) + { + executeRemoteQuery(pipeline); + return; + } + SubqueryForSet right_query; + if (query_block.source->tp() == tipb::ExecType::TypeJoin) + { + executeJoin(query_block.source->join(), pipeline, right_query); + // todo the join execution time is not accurate because only probe time is + // recorded here + recordProfileStreams(pipeline, query_block.source_name); + } + else + { + executeTS(query_block.source->tbl_scan(), pipeline); + recordProfileStreams(pipeline, query_block.source_name); + } + + auto res = analyzeExpressions(); + // execute selection + if (res.has_where) + { + executeWhere(pipeline, res.before_where, res.filter_column_name); + recordProfileStreams(pipeline, query_block.selection_name); + } + LOG_INFO(log, + "execution stream size for query block(before aggregation) " << query_block.qb_column_prefix << " is " << pipeline.streams.size()); + if (res.need_aggregate) + { + // execute aggregation + executeAggregation(pipeline, res.before_aggregation, res.aggregation_keys, res.aggregate_descriptions); + recordProfileStreams(pipeline, query_block.aggregation_name); + } + if (res.before_order_and_select) + { + executeExpression(pipeline, res.before_order_and_select); + } + + if (res.has_order_by) + { + // execute topN + executeOrder(pipeline, res.order_columns); + recordProfileStreams(pipeline, query_block.limitOrTopN_name); + } + + // execute projection + executeFinalProject(pipeline); + + // execute limit + if (query_block.limitOrTopN != nullptr && query_block.limitOrTopN->tp() == tipb::TypeLimit) + { + executeLimit(pipeline); + recordProfileStreams(pipeline, query_block.limitOrTopN_name); + } + + if (query_block.source->tp() == tipb::ExecType::TypeJoin) + { + SubqueriesForSets subquries; + subquries[query_block.qb_join_subquery_alias] = right_query; + subqueriesForSets.emplace_back(subquries); + } +} + +void DAGQueryBlockInterpreter::executeFinalProject(Pipeline & pipeline) +{ + auto columns = pipeline.firstStream()->getHeader(); + NamesAndTypesList input_column; + for (auto & column : columns.getColumnsWithTypeAndName()) + { + input_column.emplace_back(column.name, column.type); + } + ExpressionActionsPtr project = std::make_shared(input_column, context.getSettingsRef()); + project->add(ExpressionAction::project(final_project)); + // add final project + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, project); }); +} + +void DAGQueryBlockInterpreter::executeLimit(Pipeline & pipeline) +{ + size_t limit = 0; + if (query_block.limitOrTopN->tp() == tipb::TypeLimit) + limit = query_block.limitOrTopN->limit().limit(); + else + limit = query_block.limitOrTopN->topn().limit(); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, 0, false); }); + if (pipeline.hasMoreThanOneStream()) + { + executeUnion(pipeline); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, 0, false); }); + } +} + +BlockInputStreams DAGQueryBlockInterpreter::execute() +{ + Pipeline pipeline; + executeImpl(pipeline); + + return pipeline.streams; +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h new file mode 100644 index 00000000000..1ebf1c9850d --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -0,0 +1,127 @@ +#pragma once + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#include +#pragma GCC diagnostic pop + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class Context; +class Region; +using RegionPtr = std::shared_ptr; + +struct Pipeline +{ + BlockInputStreams streams; + + BlockInputStreamPtr & firstStream() { return streams.at(0); } + + template + void transform(Transform && transform) + { + for (auto & stream : streams) + transform(stream); + } + + bool hasMoreThanOneStream() const { return streams.size() > 1; } +}; + +struct AnalysisResult +{ + bool has_where = false; + bool need_aggregate = false; + bool has_order_by = false; + + ExpressionActionsPtr before_where; + ExpressionActionsPtr before_aggregation; + ExpressionActionsPtr before_order_and_select; + ExpressionActionsPtr final_projection; + + String filter_column_name; + std::vector order_columns; + /// Columns from the SELECT list, before renaming them to aliases. + Names selected_columns; + + Names aggregation_keys; + AggregateDescriptions aggregate_descriptions; +}; +/** build ch plan from dag request: dag executors -> ch plan + */ +class DAGQueryBlockInterpreter +{ +public: + DAGQueryBlockInterpreter(Context & context_, const std::vector & input_streams_vec_, + const DAGQueryBlock & query_block_, bool keep_session_timezone_info_, const tipb::DAGRequest & rqst, ASTPtr dummp_query, + const DAGQuerySource & dag_, std::vector & subqueriesForSets_); + + ~DAGQueryBlockInterpreter() = default; + + BlockInputStreams execute(); + +private: + void executeRemoteQuery(Pipeline & pipeline); + void executeImpl(Pipeline & pipeline); + void executeTS(const tipb::TableScan & ts, Pipeline & pipeline); + void executeJoin(const tipb::Join & join, Pipeline & pipeline, SubqueryForSet & right_query); + void prepareJoinKeys(const tipb::Join & join, const DataTypes & key_types, Pipeline & pipeline, Names & key_names, bool tiflash_left); + void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column); + void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr); + void executeOrder(Pipeline & pipeline, std::vector & order_columns); + void executeUnion(Pipeline & pipeline); + void executeLimit(Pipeline & pipeline); + void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, Names & aggregation_keys, + AggregateDescriptions & aggregate_descriptions); + void executeFinalProject(Pipeline & pipeline); + void getAndLockStorageWithSchemaVersion(TableID table_id, Int64 schema_version); + SortDescription getSortDescription(std::vector & order_columns); + AnalysisResult analyzeExpressions(); + void recordProfileStreams(Pipeline & pipeline, const String & key); + bool addTimeZoneCastAfterTS(std::vector & is_ts_column, Pipeline & pipeline); + +private: + void executeRemoteQueryImpl(Pipeline & pipeline, const std::vector & cop_key_ranges, + ::tipb::DAGRequest & dag_req, const DAGSchema & schema); + + Context & context; + std::vector input_streams_vec; + const DAGQueryBlock & query_block; + const bool keep_session_timezone_info; + const tipb::DAGRequest & rqst; + ASTPtr dummy_query; + + NamesWithAliases final_project; + + /// How many streams we ask for storage to produce, and in how many threads we will do further processing. + size_t max_streams = 1; + + /// Table from where to read data, if not subquery. + ManageableStoragePtr storage; + TableStructureReadLockPtr table_lock; + + std::unique_ptr analyzer; + + + std::vector conditions; + const DAGQuerySource & dag; + std::vector & subqueriesForSets; + + Poco::Logger * log; +}; +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index b9465af6354..292fa0f3fe2 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -14,66 +14,39 @@ namespace ErrorCodes extern const int COP_BAD_DAG_REQUEST; } // namespace ErrorCodes -const String DAGQuerySource::TS_NAME("tablescan"); -const String DAGQuerySource::SEL_NAME("selection"); -const String DAGQuerySource::AGG_NAME("aggregation"); -const String DAGQuerySource::TOPN_NAME("topN"); -const String DAGQuerySource::LIMIT_NAME("limit"); - -static void assignOrThrowException(Int32 & index, Int32 value, const String & name) -{ - if (index != -1) - { - throw Exception("Duplicated " + name + " in DAG request"); - } - index = value; -} - -DAGQuerySource::DAGQuerySource(Context & context_, - DAGContext & dag_context_, - const std::unordered_map & regions_, - const tipb::DAGRequest & dag_request_, - const bool is_batch_cop_) - : context(context_), +DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, const std::unordered_map & regions_, + const tipb::DAGRequest & dag_request_, ::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_, const bool is_batch_cop_) + : writer(std::make_shared(writer_)), + context(context_), dag_context(dag_context_), regions(regions_), dag_request(dag_request_), metrics(context.getTiFlashMetrics()), is_batch_cop(is_batch_cop_) { - for (int i = 0; i < dag_request.executors_size(); i++) + if (dag_request.has_root_executor()) + { + root_query_block = std::make_shared(1, dag_request.root_executor()); + } + else { - switch (dag_request.executors(i).tp()) + root_query_block = std::make_shared(1, dag_request.executors()); + } + root_query_block->collectAllPossibleChildrenJoinSubqueryAlias(dag_context.qb_id_to_join_alias_map); + for (Int32 i : dag_request.output_offsets()) + root_query_block->output_offsets.push_back(i); + if (root_query_block->aggregation != nullptr) + { + for (auto & field_type : root_query_block->output_field_types) + result_field_types.push_back(field_type); + } + else + { + for (UInt32 i : dag_request.output_offsets()) { - case tipb::ExecType::TypeTableScan: - GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_ts).Increment(); - assignOrThrowException(ts_index, i, TS_NAME); - break; - case tipb::ExecType::TypeSelection: - GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_sel).Increment(); - assignOrThrowException(sel_index, i, SEL_NAME); - break; - case tipb::ExecType::TypeStreamAgg: - case tipb::ExecType::TypeAggregation: - GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_agg).Increment(); - assignOrThrowException(agg_index, i, AGG_NAME); - break; - case tipb::ExecType::TypeTopN: - GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_topn).Increment(); - assignOrThrowException(order_index, i, TOPN_NAME); - assignOrThrowException(limit_index, i, TOPN_NAME); - break; - case tipb::ExecType::TypeLimit: - GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_limit).Increment(); - assignOrThrowException(limit_index, i, LIMIT_NAME); - break; - default: - throw Exception( - "Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED); + result_field_types.push_back(root_query_block->output_field_types[i]); } } - - analyzeResultFieldTypes(); analyzeDAGEncodeType(); } @@ -108,69 +81,4 @@ std::unique_ptr DAGQuerySource::interpreter(Context &, QueryProces return std::make_unique(context, *this); } -bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector & output_field_types) -{ - tipb::FieldType field_type; - switch (executor.tp()) - { - case tipb::ExecType::TypeTableScan: - for (auto & ci : executor.tbl_scan().columns()) - { - field_type.set_tp(ci.tp()); - field_type.set_flag(ci.flag()); - field_type.set_flen(ci.columnlen()); - field_type.set_decimal(ci.decimal()); - output_field_types.push_back(field_type); - } - return true; - case tipb::ExecType::TypeStreamAgg: - case tipb::ExecType::TypeAggregation: - for (auto & expr : executor.aggregation().agg_func()) - { - if (!exprHasValidFieldType(expr)) - { - throw Exception("Agg expression without valid field type", ErrorCodes::COP_BAD_DAG_REQUEST); - } - output_field_types.push_back(expr.field_type()); - } - for (auto & expr : executor.aggregation().group_by()) - { - if (!exprHasValidFieldType(expr)) - { - throw Exception("Group by expression without valid field type", ErrorCodes::COP_BAD_DAG_REQUEST); - } - output_field_types.push_back(expr.field_type()); - } - return true; - default: - return false; - } -} - -void DAGQuerySource::analyzeResultFieldTypes() -{ - std::vector executor_output; - for (int i = dag_request.executors_size() - 1; i >= 0; i--) - { - if (fillExecutorOutputFieldTypes(dag_request.executors(i), executor_output)) - break; - } - if (executor_output.empty()) - { - throw Exception("Do not found result field type for current dag request", ErrorCodes::COP_BAD_DAG_REQUEST); - } - // tispark assumes that if there is a agg, the output offset is - // ignored and the request output is the same as the agg's output. - // todo should always use output offset to re-construct the output field types - if (hasAggregation()) - { - result_field_types = std::move(executor_output); - } - else - { - for (UInt32 i : dag_request.output_offsets()) - result_field_types.push_back(executor_output[i]); - } -} - } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.h b/dbms/src/Flash/Coprocessor/DAGQuerySource.h index 9c424cee083..9e935fc7798 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.h +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.h @@ -7,11 +7,13 @@ #include #include +#include #include #include #include #include + namespace DB { @@ -19,84 +21,53 @@ class Context; class TiFlashMetrics; using TiFlashMetricsPtr = std::shared_ptr; +struct StreamWriter +{ + ::grpc::ServerWriter<::coprocessor::BatchResponse> * writer; + std::mutex write_mutex; + + StreamWriter(::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_) : writer(writer_) {} + + void write(const ::coprocessor::BatchResponse & data) + { + std::lock_guard lk(write_mutex); + writer->Write(data); + } +}; + +using StreamWriterPtr = std::shared_ptr; + /// Query source of a DAG request via gRPC. /// This is also an IR of a DAG. class DAGQuerySource : public IQuerySource { public: - static const String TS_NAME; - static const String SEL_NAME; - static const String AGG_NAME; - static const String TOPN_NAME; - static const String LIMIT_NAME; - - DAGQuerySource(Context & context_, DAGContext & dag_context_, const std::unordered_map & regions, - const tipb::DAGRequest & dag_request_, const bool is_batch_cop_ = false); + DAGQuerySource(Context & context_, DAGContext & dag_context_, const std::unordered_map & regions_, + const tipb::DAGRequest & dag_request_, ::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_ = nullptr, + const bool is_batch_cop_ = false); std::tuple parse(size_t max_query_size) override; String str(size_t max_query_size) override; std::unique_ptr interpreter(Context & context, QueryProcessingStage::Enum stage) override; - DAGContext & getDAGContext() const { return dag_context; }; - - bool hasSelection() const { return sel_index != -1; }; - bool hasAggregation() const { return agg_index != -1; }; - bool hasTopN() const { return order_index != -1; }; - bool hasLimit() const { return order_index == -1 && limit_index != -1; }; - - Int32 getTSIndex() const { return ts_index; }; - Int32 getSelectionIndex() const { return sel_index; }; - Int32 getAggregationIndex() const { return agg_index; }; - Int32 getTopNIndex() const { return order_index; }; - Int32 getLimitIndex() const { return limit_index; }; - - const tipb::TableScan & getTS() const - { - assertValid(ts_index, TS_NAME); - return dag_request.executors(ts_index).tbl_scan(); - }; - const tipb::Selection & getSelection() const - { - assertValid(sel_index, SEL_NAME); - return dag_request.executors(sel_index).selection(); - }; - const tipb::Aggregation & getAggregation() const - { - assertValid(agg_index, AGG_NAME); - return dag_request.executors(agg_index).aggregation(); - }; - const tipb::TopN & getTopN() const - { - assertValid(order_index, TOPN_NAME); - return dag_request.executors(order_index).topn(); - }; - const tipb::Limit & getLimit() const - { - assertValid(limit_index, LIMIT_NAME); - return dag_request.executors(limit_index).limit(); - }; const tipb::DAGRequest & getDAGRequest() const { return dag_request; }; - const std::vector & getResultFieldTypes() const { return result_field_types; }; + std::vector getResultFieldTypes() const { return result_field_types; }; ASTPtr getAST() const { return ast; }; tipb::EncodeType getEncodeType() const { return encode_type; } + std::shared_ptr getQueryBlock() const { return root_query_block; } const std::unordered_map & getRegions() const { return regions; } bool isBatchCop() const { return is_batch_cop; } -protected: - void assertValid(Int32 index, const String & name) const - { - if (index < 0 || index > dag_request.executors_size()) - { - throw Exception("Access invalid executor: " + name); - } - } + DAGContext & getDAGContext() const { return dag_context; } + + StreamWriterPtr writer; - void analyzeResultFieldTypes(); +protected: void analyzeDAGEncodeType(); protected: @@ -109,15 +80,9 @@ class DAGQuerySource : public IQuerySource TiFlashMetricsPtr metrics; - Int32 ts_index = -1; - Int32 sel_index = -1; - Int32 agg_index = -1; - Int32 order_index = -1; - Int32 limit_index = -1; - std::vector result_field_types; tipb::EncodeType encode_type; - + std::shared_ptr root_query_block; ASTPtr ast; const bool is_batch_cop; diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp new file mode 100644 index 00000000000..d71f53b7a92 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp @@ -0,0 +1,170 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int UNSUPPORTED_PARAMETER; +extern const int LOGICAL_ERROR; +} // namespace ErrorCodes + +template +DAGResponseWriter::DAGResponseWriter(tipb::SelectResponse * dag_response_, StreamWriterPtr writer_, Int64 records_per_chunk_, + tipb::EncodeType encode_type_, std::vector result_field_types_, DAGContext & dag_context_, + bool collect_execute_summary_, bool return_executor_id_) + : dag_response(dag_response_), + writer(std::move(writer_)), + result_field_types(std::move(result_field_types_)), + records_per_chunk(records_per_chunk_), + encode_type(encode_type_), + current_records_num(0), + dag_context(dag_context_), + collect_execute_summary(collect_execute_summary_), + return_executor_id(return_executor_id_) +{ + previous_execute_stats.resize(dag_context.profile_streams_map.size(), std::make_tuple(0, 0, 0)); + if (encode_type == tipb::EncodeType::TypeDefault) + { + chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); + } + else if (encode_type == tipb::EncodeType::TypeChunk) + { + chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); + } + else if (encode_type == tipb::EncodeType::TypeCHBlock) + { + chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); + records_per_chunk = -1; + } + else + { + throw Exception("Only Default and Arrow encode type is supported in DAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER); + } + if (dag_response) + dag_response->set_encode_type(encode_type); +} + +template +void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse * response) +{ + if (!collect_execute_summary) + return; + // add ExecutorExecutionSummary info + for (auto & p : dag_context.profile_streams_map) + { + auto * executeSummary = response->add_execution_summaries(); + UInt64 time_processed_ns = 0; + UInt64 num_produced_rows = 0; + UInt64 num_iterations = 0; + for (auto & streamPtr : p.second.input_streams) + { + if (auto * p_stream = dynamic_cast(streamPtr.get())) + { + time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().execution_time); + num_produced_rows += p_stream->getProfileInfo().rows; + num_iterations += p_stream->getProfileInfo().blocks; + } + } + for (auto & join_alias : dag_context.qb_id_to_join_alias_map[p.second.qb_id]) + { + if (dag_context.profile_streams_map_for_join_build_side.find(join_alias) + != dag_context.profile_streams_map_for_join_build_side.end()) + { + UInt64 process_time_for_build = 0; + for (auto & join_stream : dag_context.profile_streams_map_for_join_build_side[join_alias]) + { + if (auto * p_stream = dynamic_cast(join_stream.get())) + process_time_for_build = std::max(process_time_for_build, p_stream->getProfileInfo().execution_time); + } + time_processed_ns += process_time_for_build; + } + } + executeSummary->set_time_processed_ns(time_processed_ns); + executeSummary->set_num_produced_rows(num_produced_rows); + executeSummary->set_num_iterations(num_iterations); + if (return_executor_id) + executeSummary->set_executor_id(p.first); + } +} + +template <> +void DAGResponseWriter::encodeChunkToDAGResponse() +{ + auto dag_chunk = dag_response->add_chunks(); + dag_chunk->set_rows_data(chunk_codec_stream->getString()); + chunk_codec_stream->clear(); + current_records_num = 0; +} + +template <> +void DAGResponseWriter::encodeChunkToDAGResponse() +{ + ::coprocessor::BatchResponse resp; + + tipb::SelectResponse stream_dag_response; + stream_dag_response.set_encode_type(encode_type); + auto dag_chunk = stream_dag_response.add_chunks(); + dag_chunk->set_rows_data(chunk_codec_stream->getString()); + chunk_codec_stream->clear(); + current_records_num = 0; + addExecuteSummaries(&stream_dag_response); + std::string dag_data; + stream_dag_response.SerializeToString(&dag_data); + resp.set_data(dag_data); + + writer->write(resp); +} + +template +void DAGResponseWriter::finishWrite() +{ + if (current_records_num > 0) + { + encodeChunkToDAGResponse(); + } + if constexpr (!streaming) + { + addExecuteSummaries(dag_response); + } +} + +template +void DAGResponseWriter::write(const Block & block) +{ + if (block.columns() != result_field_types.size()) + throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR); + if (records_per_chunk == -1) + { + current_records_num = 0; + if (block.rows() > 0) + { + chunk_codec_stream->encode(block, 0, block.rows()); + encodeChunkToDAGResponse(); + } + } + else + { + size_t rows = block.rows(); + for (size_t row_index = 0; row_index < rows;) + { + if (current_records_num >= records_per_chunk) + { + encodeChunkToDAGResponse(); + } + const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows); + chunk_codec_stream->encode(block, row_index, upper); + current_records_num += (upper - row_index); + row_index = upper; + } + } +} + +template class DAGResponseWriter; +template class DAGResponseWriter; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h new file mode 100644 index 00000000000..30022aa368f --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include +#include +#include +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#pragma GCC diagnostic pop + +namespace DB +{ + +/// Serializes the stream of blocks in TiDB DAG response format. +template +class DAGResponseWriter +{ +public: + DAGResponseWriter(tipb::SelectResponse * response_, StreamWriterPtr writer_, Int64 records_per_chunk_, tipb::EncodeType encodeType_, + std::vector result_field_types, DAGContext & dag_context_, bool collect_execute_summary_, + bool return_executor_id_); + + void write(const Block & block); + void finishWrite(); + void encodeChunkToDAGResponse(); + void addExecuteSummaries(tipb::SelectResponse * dag_response); + +private: + tipb::SelectResponse * dag_response; + StreamWriterPtr writer; + std::vector result_field_types; + Int64 records_per_chunk; + tipb::EncodeType encode_type; + std::unique_ptr chunk_codec_stream; + Int64 current_records_num; + DAGContext & dag_context; + bool collect_execute_summary; + bool return_executor_id; + std::vector> previous_execute_stats; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGStringConverter.cpp b/dbms/src/Flash/Coprocessor/DAGStringConverter.cpp index 69e19ddd957..48c91964120 100644 --- a/dbms/src/Flash/Coprocessor/DAGStringConverter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStringConverter.cpp @@ -139,6 +139,7 @@ void DAGStringConverter::buildString(const tipb::Executor & executor, std::strin { case tipb::ExecType::TypeTableScan: return buildTSString(executor.tbl_scan(), ss); + case tipb::ExecType::TypeJoin: case tipb::ExecType::TypeIndexScan: // index scan not supported throw Exception("IndexScan is not supported", ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 591a2647bdf..322789ae82c 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -1,39 +1,15 @@ -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include +#include #include -#include -#include +#include #include #include -#include #include +#include #include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include namespace DB { @@ -54,678 +30,75 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_) dag.getEncodeType() == tipb::EncodeType::TypeChunk || dag.getEncodeType() == tipb::EncodeType::TypeCHBlock), log(&Logger::get("InterpreterDAG")) { - if (dag.hasSelection()) - { - for (auto & condition : dag.getSelection().conditions()) - conditions.push_back(&condition); - } -} - -// the flow is the same as executeFetchcolumns -void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) -{ - auto & tmt = context.getTMTContext(); - if (!ts.has_table_id()) - { - // do not have table id - throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST); - } - TableID table_id = ts.table_id(); - const Settings & settings = context.getSettingsRef(); - - if (settings.schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION) - { - storage = tmt.getStorages().get(table_id); - if (storage == nullptr) - { - throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); - } - table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); - } - else - { - getAndLockStorageWithSchemaVersion(table_id, settings.schema_version); - } - - - Names required_columns; - std::vector source_columns; - std::vector is_ts_column; - String handle_column_name = MutableSupport::tidb_pk_column_name; - if (auto pk_handle_col = storage->getTableInfo().getPKHandleColumn()) - handle_column_name = pk_handle_col->get().name; - - - for (Int32 i = 0; i < ts.columns().size(); i++) - { - auto const & ci = ts.columns(i); - ColumnID cid = ci.column_id(); - - if (cid == DB::TiDBPkColumnID) - { - // Column ID -1 return the handle column - required_columns.push_back(handle_column_name); - auto pair = storage->getColumns().getPhysical(handle_column_name); - source_columns.emplace_back(std::move(pair)); - is_ts_column.push_back(false); - continue; - } - - String name = storage->getTableInfo().getColumnName(cid); - required_columns.push_back(name); - auto pair = storage->getColumns().getPhysical(name); - source_columns.emplace_back(std::move(pair)); - is_ts_column.push_back(ci.tp() == TiDB::TypeTimestamp); - } - - analyzer = std::make_unique(std::move(source_columns), context); - - if (!dag.hasAggregation()) - { - // if the dag request does not contain agg, then the final output is - // based on the output of table scan - for (auto i : dag.getDAGRequest().output_offsets()) - { - if (i >= required_columns.size()) - { - // array index out of bound - throw Exception("Output offset index is out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); - } - // do not have alias - final_project.emplace_back(required_columns[i], ""); - } - } - // todo handle alias column - if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) - { - throw Exception("Limit for number of columns to read exceeded. " - "Requested: " - + toString(required_columns.size()) + ", maximum: " + settings.max_columns_to_read.toString(), - ErrorCodes::TOO_MANY_COLUMNS); - } - - size_t max_block_size = settings.max_block_size; max_streams = settings.max_threads; - QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; if (max_streams > 1) { max_streams *= settings.max_streams_to_max_threads_ratio; } - - if (dag.hasSelection()) - { - for (auto & condition : dag.getSelection().conditions()) - { - analyzer->makeExplicitSetForIndex(condition, storage); - } - } - SelectQueryInfo query_info; - // set query to avoid unexpected NPE - query_info.query = dag.getAST(); - query_info.dag_query = std::make_unique(conditions, analyzer->getPreparedSets(), analyzer->getCurrentInputColumns()); - query_info.mvcc_query_info = std::make_unique(); - query_info.mvcc_query_info->resolve_locks = true; - query_info.mvcc_query_info->read_tso = settings.read_tso; - if (dag.getRegions().empty()) - { - throw Exception("Dag Request does not have region to read. ", ErrorCodes::COP_BAD_DAG_REQUEST); - } - - if (!dag.isBatchCop()) - { - if (auto [info_retry, status] = MakeRegionQueryInfos(dag.getRegions(), {}, tmt, *query_info.mvcc_query_info, table_id); info_retry) - throw RegionException({(*info_retry).begin()->first}, status); - - try - { - pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); - } - catch (DB::Exception & e) - { - e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() - + "`, table_id: " + DB::toString(table_id) + ")"); - throw; - } - } - else - { - std::unordered_map region_retry; - std::unordered_set force_retry; - for (;;) - { - try - { - region_retry.clear(); - auto [retry, status] = MakeRegionQueryInfos(dag.getRegions(), force_retry, tmt, *query_info.mvcc_query_info, table_id); - std::ignore = status; - if (retry) - { - region_retry = std::move(*retry); - for (auto & r : region_retry) - force_retry.emplace(r.first); - } - if (query_info.mvcc_query_info->regions_query_info.empty()) - break; - pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); - break; - } - catch (const LockException & e) - { - // We can also use current thread to resolve lock, but it will block next process. - // So, force this region retry in another thread in CoprocessorBlockInputStream. - force_retry.emplace(e.region_id); - } - catch (const RegionException & e) - { - if (tmt.getTerminated()) - throw Exception("TiFlash server is terminating", ErrorCodes::LOGICAL_ERROR); - // By now, RegionException will contain all region id of MvccQueryInfo, which is needed by CHSpark. - // When meeting RegionException, we can let MakeRegionQueryInfos to check in next loop. - } - catch (DB::Exception & e) - { - e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() - + "`, table_id: " + DB::toString(table_id) + ")"); - throw; - } - } - - if (region_retry.size()) - { - LOG_DEBUG(log, ({ - std::stringstream ss; - ss << "Start to retry " << region_retry.size() << " regions ("; - for (auto & r : region_retry) - ss << r.first << ","; - ss << ")"; - ss.str(); - })); - - DAGSchema schema; - ::tipb::DAGRequest dag_req; - - { - const auto & table_info = storage->getTableInfo(); - tipb::Executor * ts_exec = dag_req.add_executors(); - ts_exec->set_tp(tipb::ExecType::TypeTableScan); - *(ts_exec->mutable_tbl_scan()) = ts; - - for (int i = 0; i < ts.columns().size(); ++i) - { - const auto & col = ts.columns(i); - auto col_id = col.column_id(); - - if (col_id == DB::TiDBPkColumnID) - { - ColumnInfo ci; - ci.tp = TiDB::TypeLongLong; - ci.setPriKeyFlag(); - ci.setNotNullFlag(); - schema.emplace_back(std::make_pair(handle_column_name, std::move(ci))); - } - else - { - auto & col_info = table_info.getColumnInfo(col_id); - schema.emplace_back(std::make_pair(col_info.name, col_info)); - } - dag_req.add_output_offsets(i); - } - dag_req.set_encode_type(tipb::EncodeType::TypeCHBlock); - } - - std::vector ranges; - for (auto & info : region_retry) - { - for (auto & range : info.second.key_ranges) - ranges.emplace_back(range.first, range.second); - } - - pingcap::coprocessor::Request req{.tp = pingcap::coprocessor::ReqType::DAG, - .start_ts = settings.read_tso, - .data = dag_req.SerializeAsString(), - .ranges = std::move(ranges), - .schema_version = settings.schema_version}; - pingcap::kv::StoreType store_type = pingcap::kv::TiFlash; - BlockInputStreamPtr input - = std::make_shared(tmt.getCluster().get(), req, schema, max_streams, store_type); - pipeline.streams.emplace_back(input); - } - } - - if (pipeline.streams.empty()) - { - pipeline.streams.emplace_back(std::make_shared(storage->getSampleBlockForColumns(required_columns))); - } - - pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); }); - - /// Set the limits and quota for reading data, the speed and time of the query. - { - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode); - limits.max_execution_time = settings.max_execution_time; - limits.timeout_overflow_mode = settings.timeout_overflow_mode; - - /** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers, - * because the initiating server has a summary of the execution of the request on all servers. - * - * But limits on data size to read and maximum execution time are reasonable to check both on initiator and - * additionally on each remote server, because these limits are checked per block of data processed, - * and remote servers may process way more blocks of data than are received by initiator. - */ - limits.min_execution_speed = settings.min_execution_speed; - limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed; - - QuotaForIntervals & quota = context.getQuota(); - - pipeline.transform([&](auto & stream) { - if (IProfilingBlockInputStream * p_stream = dynamic_cast(stream.get())) - { - p_stream->setLimits(limits); - p_stream->setQuota(quota); - } - }); - } - - if (addTimeZoneCastAfterTS(is_ts_column, pipeline)) - { - // for arrow encode, the final select of timestamp column should be column with session timezone - if (keep_session_timezone_info && !dag.hasAggregation()) - { - for (auto i : dag.getDAGRequest().output_offsets()) - { - if (is_ts_column[i]) - { - final_project[i].first = analyzer->getCurrentInputColumns()[i].name; - } - } - } - } -} - -// add timezone cast for timestamp type, this is used to support session level timezone -bool InterpreterDAG::addTimeZoneCastAfterTS(std::vector & is_ts_column, Pipeline & pipeline) -{ - bool hasTSColumn = false; - for (auto b : is_ts_column) - hasTSColumn |= b; - if (!hasTSColumn) - return false; - - ExpressionActionsChain chain; - if (analyzer->appendTimeZoneCastsAfterTS(chain, is_ts_column)) - { - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, chain.getLastActions()); }); - return true; - } - else - return false; -} - -InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() -{ - AnalysisResult res; - ExpressionActionsChain chain; - if (!conditions.empty()) - { - analyzer->appendWhere(chain, conditions, res.filter_column_name); - res.has_where = true; - res.before_where = chain.getLastActions(); - chain.addStep(); - } - // There will be either Agg... - if (dag.hasAggregation()) - { - analyzer->appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions); - res.need_aggregate = true; - res.before_aggregation = chain.getLastActions(); - - chain.finalize(); - chain.clear(); - - // add cast if type is not match - analyzer->appendAggSelect(chain, dag.getAggregation(), keep_session_timezone_info); - //todo use output_offset to reconstruct the final project columns - for (auto & element : analyzer->getCurrentInputColumns()) - { - final_project.emplace_back(element.name, ""); - } - } - // Or TopN, not both. - if (dag.hasTopN()) - { - res.has_order_by = true; - analyzer->appendOrderBy(chain, dag.getTopN(), res.order_columns); - } - // Append final project results if needed. - analyzer->appendFinalProject(chain, final_project); - res.before_order_and_select = chain.getLastActions(); - chain.finalize(); - chain.clear(); - //todo need call prependProjectInput?? - return res; -} - -void InterpreterDAG::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expr, String & filter_column) -{ - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expr, filter_column); }); } -void InterpreterDAG::executeAggregation( - Pipeline & pipeline, const ExpressionActionsPtr & expr, Names & key_names, AggregateDescriptions & aggregates) +void InterpreterDAG::executeUnion(Pipeline & pipeline) { - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expr); }); - - Block header = pipeline.firstStream()->getHeader(); - ColumnNumbers keys; - for (const auto & name : key_names) - { - keys.push_back(header.getPositionByName(name)); - } - for (auto & descr : aggregates) - { - if (descr.arguments.empty()) - { - for (const auto & name : descr.argument_names) - { - descr.arguments.push_back(header.getPositionByName(name)); - } - } - } - - const Settings & settings = context.getSettingsRef(); - - /** Two-level aggregation is useful in two cases: - * 1. Parallel aggregation is done, and the results should be merged in parallel. - * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way. - */ - bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0; - - Aggregator::Params params(header, keys, aggregates, false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, - settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, - allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), - allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), - settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, context.getTemporaryPath()); - - /// If there are several sources, then we perform parallel aggregation - if (pipeline.streams.size() > 1) + if (pipeline.hasMoreThanOneStream()) { - pipeline.firstStream() = std::make_shared(pipeline.streams, nullptr, params, true, max_streams, - settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) - : static_cast(settings.max_threads)); - + pipeline.firstStream() = std::make_shared>(pipeline.streams, nullptr, max_streams); pipeline.streams.resize(1); } - else - { - pipeline.firstStream() = std::make_shared(pipeline.firstStream(), params, true); - } - // add cast } -void InterpreterDAG::executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr) +BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block, std::vector & subqueriesForSets) { - if (!expressionActionsPtr->getActions().empty()) + if (!query_block.children.empty()) { - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expressionActionsPtr); }); - } -} - -void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64 query_schema_version) -{ - /// Get current schema version in schema syncer for a chance to shortcut. - auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion(); - - /// Lambda for get storage, then align schema version under the read lock. - auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple { - /// Get storage in case it's dropped then re-created. - // If schema synced, call getTable without try, leading to exception on table not existing. - auto storage_ = context.getTMTContext().getStorages().get(table_id); - if (!storage_) + std::vector input_streams_vec; + for (auto & child : query_block.children) { - if (schema_synced) - throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); - else - return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false); - } - - if (storage_->engineType() != ::TiDB::StorageEngine::TMT && storage_->engineType() != ::TiDB::StorageEngine::DT) - { - throw Exception("Specifying schema_version for non-managed storage: " + storage_->getName() - + ", table: " + storage_->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed", - ErrorCodes::LOGICAL_ERROR); - } - - /// Lock storage. - auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__); - - /// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema. - // We have three schema versions, two in TiFlash: - // 1. Storage: the version that this TiFlash table (storage) was last altered. - // 2. Global: the version that TiFlash global schema is at. - // And one from TiDB/TiSpark: - // 3. Query: the version that TiDB/TiSpark used for this query. - auto storage_schema_version = storage_->getTableInfo().schema_version; - // Not allow storage > query in any case, one example is time travel queries. - if (storage_schema_version > query_schema_version) - throw Exception("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version) - + " newer than query schema version " + std::to_string(query_schema_version), - ErrorCodes::SCHEMA_VERSION_ERROR); - // From now on we have storage <= query. - // If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve. - if (schema_synced) - return std::make_tuple(storage_, lock, storage_schema_version, true); - // From now on the schema was not synced. - // 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve. - // 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query, - // meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve. - if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version) - return std::make_tuple(storage_, lock, storage_schema_version, true); - // From now on we have global < query. - // Return false for outer to sync and retry. - return std::make_tuple(nullptr, nullptr, storage_schema_version, false); - }; - - /// Try get storage and lock once. - ManageableStoragePtr storage_; - TableStructureReadLockPtr lock; - Int64 storage_schema_version; - auto log_schema_version = [&](const String & result) { - LOG_DEBUG(log, - __PRETTY_FUNCTION__ << " Table " << table_id << " schema " << result << " Schema version [storage, global, query]: " - << "[" << storage_schema_version << ", " << global_schema_version << ", " << query_schema_version << "]."); - }; - bool ok; - { - std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(false); - if (ok) - { - log_schema_version("OK, no syncing required."); - storage = storage_; - table_lock = lock; - return; + BlockInputStreams child_streams = executeQueryBlock(*child, subqueriesForSets); + input_streams_vec.push_back(child_streams); } + DAGQueryBlockInterpreter query_block_interpreter( + context, input_streams_vec, query_block, keep_session_timezone_info, dag.getDAGRequest(), dag.getAST(), dag, subqueriesForSets); + return query_block_interpreter.execute(); } - - /// If first try failed, sync schema and try again. + else { - log_schema_version("not OK, syncing schemas."); - auto start_time = Clock::now(); - context.getTMTContext().getSchemaSyncer()->syncSchemas(context); - auto schema_sync_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); - LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << table_id << " schema sync cost " << schema_sync_cost << "ms."); - - std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(true); - if (ok) - { - log_schema_version("OK after syncing."); - storage = storage_; - table_lock = lock; - return; - } - - throw Exception("Shouldn't reach here", ErrorCodes::UNKNOWN_EXCEPTION); + DAGQueryBlockInterpreter query_block_interpreter( + context, {}, query_block, keep_session_timezone_info, dag.getDAGRequest(), dag.getAST(), dag, subqueriesForSets); + return query_block_interpreter.execute(); } } -SortDescription InterpreterDAG::getSortDescription(std::vector & order_columns) +BlockIO InterpreterDAG::execute() { - // construct SortDescription - SortDescription order_descr; - const tipb::TopN & topn = dag.getTopN(); - order_descr.reserve(topn.order_by_size()); - for (int i = 0; i < topn.order_by_size(); i++) - { - String name = order_columns[i].name; - int direction = topn.order_by(i).desc() ? -1 : 1; - // MySQL/TiDB treats NULL as "minimum". - int nulls_direction = -1; - std::shared_ptr collator = nullptr; - if (removeNullable(order_columns[i].type)->isString()) - collator = getCollatorFromExpr(topn.order_by(i).expr()); + /// region_info should based on the source executor, however + /// tidb does not support multi-table dag request yet, so + /// it is ok to use the same region_info for the whole dag request + std::vector subqueriesForSets; + BlockInputStreams streams = executeQueryBlock(*dag.getQueryBlock(), subqueriesForSets); - order_descr.emplace_back(name, direction, nulls_direction, collator); - } - return order_descr; -} + Pipeline pipeline; + pipeline.streams = streams; -void InterpreterDAG::executeUnion(Pipeline & pipeline) -{ - if (pipeline.hasMoreThanOneStream()) + if (dag.writer->writer != nullptr) { - pipeline.firstStream() = std::make_shared>(pipeline.streams, nullptr, max_streams); - pipeline.streams.resize(1); + bool collect_exec_summary + = dag.getDAGRequest().has_collect_execution_summaries() && dag.getDAGRequest().collect_execution_summaries(); + for (auto & stream : pipeline.streams) + stream = std::make_shared(stream, dag.writer, context.getSettings().dag_records_per_chunk, + dag.getEncodeType(), dag.getResultFieldTypes(), stream->getHeader(), dag.getDAGContext(), collect_exec_summary, + dag.getDAGRequest().has_root_executor()); } -} - -void InterpreterDAG::executeOrder(Pipeline & pipeline, std::vector & order_columns) -{ - SortDescription order_descr = getSortDescription(order_columns); - const Settings & settings = context.getSettingsRef(); - Int64 limit = dag.getTopN().limit(); - - pipeline.transform([&](auto & stream) { - auto sorting_stream = std::make_shared(stream, order_descr, limit); - - /// Limits on sorting - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); - sorting_stream->setLimits(limits); - - stream = sorting_stream; - }); - - /// If there are several streams, we merge them into one executeUnion(pipeline); - - /// Merge the sorted blocks. - pipeline.firstStream() = std::make_shared(pipeline.firstStream(), order_descr, settings.max_block_size, - limit, settings.max_bytes_before_external_sort, context.getTemporaryPath()); -} - -void InterpreterDAG::recordProfileStreams(Pipeline & pipeline, Int32 index) -{ - for (auto & stream : pipeline.streams) - { - dag.getDAGContext().profile_streams_list[index].push_back(stream); - } -} - -void InterpreterDAG::executeImpl(Pipeline & pipeline) -{ - executeTS(dag.getTS(), pipeline); - recordProfileStreams(pipeline, dag.getTSIndex()); - - auto res = analyzeExpressions(); - // execute selection - if (res.has_where) - { - executeWhere(pipeline, res.before_where, res.filter_column_name); - if (dag.hasSelection()) - recordProfileStreams(pipeline, dag.getSelectionIndex()); - } - if (res.need_aggregate) - { - // execute aggregation - executeAggregation(pipeline, res.before_aggregation, res.aggregation_keys, res.aggregate_descriptions); - recordProfileStreams(pipeline, dag.getAggregationIndex()); - } - if (res.before_order_and_select) + if (!subqueriesForSets.empty()) { - executeExpression(pipeline, res.before_order_and_select); + const Settings & settings = context.getSettingsRef(); + pipeline.firstStream() = std::make_shared(pipeline.firstStream(), std::move(subqueriesForSets), + SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode)); } - if (res.has_order_by) - { - // execute topN - executeOrder(pipeline, res.order_columns); - recordProfileStreams(pipeline, dag.getTopNIndex()); - } - - // execute projection - executeFinalProject(pipeline); - - // execute limit - if (dag.hasLimit() && !dag.hasTopN()) - { - executeLimit(pipeline); - recordProfileStreams(pipeline, dag.getLimitIndex()); - } -} - -void InterpreterDAG::executeFinalProject(Pipeline & pipeline) -{ - auto columns = pipeline.firstStream()->getHeader(); - NamesAndTypesList input_column; - for (auto & column : columns.getColumnsWithTypeAndName()) - { - input_column.emplace_back(column.name, column.type); - } - ExpressionActionsPtr project = std::make_shared(input_column, context.getSettingsRef()); - project->add(ExpressionAction::project(final_project)); - // add final project - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, project); }); -} - -void InterpreterDAG::executeLimit(Pipeline & pipeline) -{ - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, dag.getLimit().limit(), 0, false); }); - if (pipeline.hasMoreThanOneStream()) - { - executeUnion(pipeline); - pipeline.transform( - [&](auto & stream) { stream = std::make_shared(stream, dag.getLimit().limit(), 0, false); }); - } -} - -BlockIO InterpreterDAG::execute() -{ - Pipeline pipeline; - executeImpl(pipeline); - executeUnion(pipeline); - BlockIO res; res.in = pipeline.firstStream(); - - LOG_DEBUG( - log, __PRETTY_FUNCTION__ << " Convert DAG request to BlockIO, adding " << analyzer->getImplicitCastCount() << " implicit cast"); - if (log->debug()) - { - try - { - DAGStringConverter converter(context, dag.getDAGRequest()); - auto sql_text = converter.buildSqlString(); - LOG_DEBUG(log, __PRETTY_FUNCTION__ << " SQL in DAG request is " << sql_text); - } - catch (...) - { - // catch all the exceptions so the convert error will not affect the query execution - LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Failed to convert DAG request to sql text"); - } - } return res; } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.h b/dbms/src/Flash/Coprocessor/InterpreterDAG.h index c58f60bb3a9..bf175b98c6a 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.h +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.h @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -36,65 +37,15 @@ class InterpreterDAG : public IInterpreter BlockIO execute(); private: - struct Pipeline - { - BlockInputStreams streams; - - BlockInputStreamPtr & firstStream() { return streams.at(0); } - - template - void transform(Transform && transform) - { - for (auto & stream : streams) - transform(stream); - } - - bool hasMoreThanOneStream() const { return streams.size() > 1; } - }; - - struct AnalysisResult - { - bool has_where = false; - bool need_aggregate = false; - bool has_order_by = false; - - ExpressionActionsPtr before_where; - ExpressionActionsPtr before_aggregation; - ExpressionActionsPtr before_order_and_select; - ExpressionActionsPtr final_projection; - - String filter_column_name; - std::vector order_columns; - /// Columns from the SELECT list, before renaming them to aliases. - Names selected_columns; - - Names aggregation_keys; - AggregateDescriptions aggregate_descriptions; - }; - - void executeImpl(Pipeline & pipeline); - void executeTS(const tipb::TableScan & ts, Pipeline & pipeline); - void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column); - void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr); - void executeOrder(Pipeline & pipeline, std::vector & order_columns); + BlockInputStreams executeQueryBlock(DAGQueryBlock & query_block, std::vector & subqueriesForSets); void executeUnion(Pipeline & pipeline); - void executeLimit(Pipeline & pipeline); - void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, Names & aggregation_keys, - AggregateDescriptions & aggregate_descriptions); - void executeFinalProject(Pipeline & pipeline); - void getAndLockStorageWithSchemaVersion(TableID table_id, Int64 schema_version); - SortDescription getSortDescription(std::vector & order_columns); - AnalysisResult analyzeExpressions(); - void recordProfileStreams(Pipeline & pipeline, Int32 index); - bool addTimeZoneCastAfterTS(std::vector & is_ts_column, Pipeline & pipeline); + RegionException::RegionReadStatus getRegionReadStatus(const RegionPtr & current_region); private: Context & context; const DAGQuerySource & dag; - NamesWithAliases final_project; - /// How many streams we ask for storage to produce, and in how many threads we will do further processing. size_t max_streams = 1; @@ -106,8 +57,6 @@ class InterpreterDAG : public IInterpreter const bool keep_session_timezone_info; - std::vector conditions; - Poco::Logger * log; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGBlockInputStream.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGBlockInputStream.cpp new file mode 100644 index 00000000000..71710bec07c --- /dev/null +++ b/dbms/src/Flash/Coprocessor/StreamingDAGBlockInputStream.cpp @@ -0,0 +1,45 @@ +#include +#include +#include +#include + +namespace DB +{ + +StreamingDAGBlockInputStream::StreamingDAGBlockInputStream(BlockInputStreamPtr input, StreamWriterPtr writer, Int64 records_per_chunk, + tipb::EncodeType encode_type, std::vector && result_field_types, Block && header_, DAGContext & dag_context, + bool collect_execute_summary, bool return_executor_id) + : finished(false), + header(std::move(header_)), + response_writer( + nullptr, writer, records_per_chunk, encode_type, result_field_types, dag_context, collect_execute_summary, return_executor_id) +{ + children.push_back(input); +} + +void StreamingDAGBlockInputStream::readPrefix() { children.back()->readPrefix(); } + +void StreamingDAGBlockInputStream::readSuffix() +{ + // todo error handle + response_writer.finishWrite(); + children.back()->readSuffix(); +} + +Block StreamingDAGBlockInputStream::readImpl() +{ + if (finished) + return {}; + while (Block block = children.back()->read()) + { + if (!block) + { + finished = true; + return {}; + } + response_writer.write(block); + } + return {}; +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGBlockInputStream.h b/dbms/src/Flash/Coprocessor/StreamingDAGBlockInputStream.h new file mode 100644 index 00000000000..b893fd8cdcd --- /dev/null +++ b/dbms/src/Flash/Coprocessor/StreamingDAGBlockInputStream.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#include + +#pragma GCC diagnostic pop + +namespace DB +{ + +/// Serializes the stream of blocks in TiDB DAG response format. +/// TODO: May consider using some parallelism. +/// TODO: Consider using output schema in DAG request, do some conversion or checking between DAG schema and block schema. +class StreamingDAGBlockInputStream : public IProfilingBlockInputStream +{ +public: + StreamingDAGBlockInputStream(BlockInputStreamPtr input_, StreamWriterPtr writer_, Int64 records_per_chunk_, + tipb::EncodeType encodeType_, std::vector && result_field_types, Block && header_, DAGContext & dag_context_, + bool collect_execute_summary_, bool return_executor_id); + + Block getHeader() const override { return header; } + Block readImpl() override; + String getName() const override { return "StreamingWriter"; } + void readPrefix() override; + void readSuffix() override; + +private: + bool finished; + Block header; + DAGResponseWriter response_writer; +}; + +} // namespace DB diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 014019fdcea..7223aa95d00 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -7,7 +7,6 @@ #include #include #include - namespace TiDB { struct TableInfo; diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index b7522a3261e..a631eaf1393 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -50,6 +50,8 @@ class TMTContext : private boost::noncopyable pingcap::pd::ClientPtr getPDClient() const; + pingcap::kv::Cluster * getKVCluster() {return cluster.get();} + IndexReaderPtr createIndexReader() const; void restore(); diff --git a/tests/fullstack-test/ddl/alter_datetime_default_value.test b/tests/fullstack-test/ddl/alter_datetime_default_value.test index 3d5d94e9082..af945b57520 100644 --- a/tests/fullstack-test/ddl/alter_datetime_default_value.test +++ b/tests/fullstack-test/ddl/alter_datetime_default_value.test @@ -87,4 +87,3 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t; +------+--------+--------+--------+--------+--------+--------+ | 1 | 1901 | 2155 | 0000 | 2001 | 1970 | 2000 | +------+--------+--------+--------+--------+--------+--------+ -