Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Branch: merge master to planner_refactory branch #5353

Merged
merged 105 commits into from
Jul 12, 2022
Merged
Changes from 1 commit
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
973de13
Refine function test framework (#4861)
windtalker May 26, 2022
f10b6d2
Add mutex to protect exchange receiver's async client (#5008)
yibin87 May 26, 2022
d84f273
Optimize blobstore gc when current stat total size is 0 (#5015)
jiaqizho May 27, 2022
ca3e1c6
Fix an invalid default value cause bootstrap failed (#4916)
Lloyd-Pottiger May 27, 2022
d1ecebd
PageStorage: Report the file usage of BlobStore (#4999)
JaySon-Huang May 27, 2022
7dff463
Fix unstable ut in the master branch (#5024)
hehechen May 27, 2022
c8624a0
refine get type from decimal literal (#5014)
windtalker May 30, 2022
516fa15
Refactor: add new module schema (#4932)
hzh0425 May 30, 2022
335b124
docs: Add rfc for compact table via sql (#4929)
breezewish May 30, 2022
580821a
Fix broken unit test for `--gtest_filter='*StorageDeltaMergeTest*:*Re…
JaySon-Huang May 30, 2022
6afdd74
Remove useless code (#5004)
hongyunyan May 31, 2022
a0ed1a6
Add mix mode UT (#5012)
hehechen May 31, 2022
800715c
support mix_mode in dt_workload (#5011)
lidezhu Jun 1, 2022
380f6cd
enable new function test framework by default (#5029)
windtalker Jun 1, 2022
187a591
Tiflash pagectl support encrypted data (#5003)
hehechen Jun 1, 2022
226dff6
update tiflash proxy (#5043)
Lloyd-Pottiger Jun 2, 2022
2ce9529
Fix potential data inconsistency under heavy ddl operation (#5044)
lidezhu Jun 2, 2022
20d2198
*: fix invalid fmt format string in CreatingSetsBlockInputStream.cpp …
ywqzzy Jun 2, 2022
677ad75
increase bg gc check interval (#5056)
lidezhu Jun 3, 2022
b49a787
PageStorage: Fix pages are not deleted under some cases (#5069)
JaySon-Huang Jun 3, 2022
a66c082
Fix unstable drop table unit test (#5059)
lidezhu Jun 3, 2022
e3a4412
Fix broken ut SegmentDeletionRelevantPlaceTest (#4607)
JaySon-Huang Jun 6, 2022
5847f1c
Add `Precompiled header` for modules & Refine by PImpl to accelerate …
solotzg Jun 8, 2022
fdab3f5
Test: Mock Input columns for operator tests (#5041)
ywqzzy Jun 8, 2022
5b61ae7
Improve the performance of partition table in extreme case (#4988)
bestwoody Jun 8, 2022
167d39f
Fix some fail cases when enable TASN (#5086)
jiaqizho Jun 9, 2022
ba725cc
PageStorage: Fix entry.tag after full gc && add more debug message (#…
JaySon-Huang Jun 9, 2022
a9b322a
feat: add page_stress_testing as a subcommand of tiflash (#5038)
Lloyd-Pottiger Jun 10, 2022
1e3207d
feat: add hardware information of server as an attribute (#5090)
Lloyd-Pottiger Jun 10, 2022
8916afe
move `tunnel_map` to MPPTunnelSet (#5123)
windtalker Jun 10, 2022
8a81342
add some notes about `getMemoryAmount()` and `getNumberOfPhysicalCPUC…
Lloyd-Pottiger Jun 10, 2022
10bcb06
*: remove TableFunctionFile (#5098)
ywqzzy Jun 13, 2022
d54fcc7
README.md: update URL of TiDB Cloud free trial (#5133)
qiancai Jun 13, 2022
9c8a588
*: remove CatBoost (#5131)
ywqzzy Jun 13, 2022
f4c2e01
Test: Mock window function, refactor window function tests (#5021)
ywqzzy Jun 13, 2022
123440c
Remove the log with high frequency and not useful enough (#5141)
hongyunyan Jun 14, 2022
4b3a2ce
modify cached_gc_safe_point to atomic to prevent more request to PD (…
shuke987 Jun 14, 2022
ad6b831
Removed a unused proxy status api named test-show (#5136)
jiaqizho Jun 14, 2022
94aa029
Fix blobstore truncate size may not right (#5127)
jiaqizho Jun 14, 2022
bcb837b
enhencement: supplement the comment for SchemaActionType (#5139)
hongyunyan Jun 14, 2022
864cfe9
Some refinements of `mpp_exchange_receiver_map` and `MPPTunnelSet` (#…
windtalker Jun 14, 2022
a79ad91
Revise default background threads size (#4723)
Lloyd-Pottiger Jun 15, 2022
617fe54
add microbenchmark for exchange and window function (#5137)
guo-shaoge Jun 16, 2022
a9a32b9
Fix the rename_table_across_databases.test to make it can run success…
hongyunyan Jun 17, 2022
ecd615f
Add more test in mix mode (#5017)
jiaqizho Jun 17, 2022
164eda5
*: Add some comments about decoding (#5158)
JaySon-Huang Jun 18, 2022
604b0de
MinMax Index Supports Nullable DataType (#5153)
hehechen Jun 20, 2022
40baeca
Reduce some unnecessary prometheus metrics. (#5006)
mengxin9014 Jun 20, 2022
ebb27d1
fix pageworklaod (#5165)
hehechen Jun 20, 2022
f3f37ae
Enhancement: Add how to run integration tests and microbenchmark test…
hongyunyan Jun 22, 2022
649462a
Enhancement: add a integrated test on DDL module (#5130)
hongyunyan Jun 22, 2022
45bc5a4
Revert "Revise default background threads size" (#5176)
Lloyd-Pottiger Jun 22, 2022
bfceb28
chore: remove extra dyn cast (#5186)
SchrodingerZhu Jun 22, 2022
e14c677
Add MPPReceiverSet, which includes ExchangeReceiver and CoprocessorRe…
windtalker Jun 22, 2022
18325f9
DDL: Use Column Name Instead of Offset to Find the common handle clus…
hongyunyan Jun 22, 2022
8a5dc29
Add random failpoint in critical paths (#4876)
yibin87 Jun 22, 2022
7c19a37
Segment test framework (#5150)
hehechen Jun 22, 2022
640c103
optimize ps v3 restore (#5163)
hehechen Jun 22, 2022
69cbfdf
Fix build failed (#5196)
wshwsh12 Jun 23, 2022
dab31a5
feat: delta tree dispatching (#5199)
SchrodingerZhu Jun 24, 2022
73e708c
feat: introduce specialized API to write fixed length data rapidly (#…
SchrodingerZhu Jun 24, 2022
f84d7e3
Add gtest for Limit, TopN, Projection (#5187) (#5188)
xzhangxian1008 Jun 28, 2022
7a20339
add `MPPTask::handleError()` (#5202)
windtalker Jun 29, 2022
31a9611
Check result of starting grpc server (#5257)
hehechen Jun 29, 2022
1ff3b38
feat: add optimized routines for aarch64 (#5231)
SchrodingerZhu Jun 29, 2022
cbe6ab5
fix: aarch64-quick-fix (#5259)
SchrodingerZhu Jun 30, 2022
045d24f
Update client-c to support ipv6 (#5270)
solotzg Jul 1, 2022
38b37e0
upgrade prometheus-cpp to v1.0.1 (#5279)
YangKeao Jul 3, 2022
4ce641b
Fix README type error (#5273)
hongyunyan Jul 3, 2022
19dfdd7
fix(cmake): make sure libc++ is utilized by tiflash-proxy (#5281)
SchrodingerZhu Jul 4, 2022
09402e3
fix the wrong order of execution summary for list based executors (#5…
SeaRise Jul 4, 2022
a89222a
Schema: allow loading empty schema diff when the version grows up. (#…
jiaqizho Jul 4, 2022
6da631c
Optimize apply speed under heavy write pressure (#4883)
lidezhu Jul 4, 2022
a0ecce0
update proxy to raftstore-proxy-6.2 (#5287)
CalvinNeo Jul 5, 2022
a228704
Flush segment cache when doing the compaction (#5284)
breezewish Jul 6, 2022
029926d
metrics: Fix incorrect metrics for delta_merge tasks (#5061)
breezewish Jul 6, 2022
4f7e24a
dep: upgrade jemalloc (#5197)
SchrodingerZhu Jul 6, 2022
7574b4b
*: TiFlash pagectl/dttool use only-decryption mode (#5271)
hehechen Jul 6, 2022
b2d8d50
suppresion false positive report from tsan (#5303)
bestwoody Jul 6, 2022
57d001c
Refine test framework code and tests (#5261)
Willendless Jul 6, 2022
fe2b539
feat: add logical cpu cores and memory into grafana (#5124)
Lloyd-Pottiger Jul 7, 2022
d9b7086
Implement TimeToSec function push down (#5235)
hey-kong Jul 7, 2022
cb69d5c
feat: implement shiftRight function push down (#5156)
Willendless Jul 7, 2022
597f8b8
schema : make update to partition tables when 'set tiflash replica' (…
hongyunyan Jul 7, 2022
bf4764b
Replace initializer_list with vector for planner test framework (#5307)
xzhangxian1008 Jul 7, 2022
0aec04a
KVStore: decouple flush region and CompactLog with a new FFI fn_try_f…
CalvinNeo Jul 7, 2022
3e8df4b
refine error message in mpptask (#5304)
windtalker Jul 7, 2022
5295223
Implement ReverseUTF8/Reverse function push down (#5233)
lizhenhuan Jul 7, 2022
97342db
Optimize comparision for collation `UTF8_BIN` and `UTF8MB4_BIN` (#5299)
solotzg Jul 7, 2022
47657d3
feat : support set tiflash mode ddl action (#5256)
hongyunyan Jul 7, 2022
cbc6a95
Add non-blocking functions for MPMCQueue (#5311)
gengliqi Jul 7, 2022
e58a007
add random segment test for CI weekly (#5300)
hehechen Jul 8, 2022
b822e99
*: tidy FunctionString.cpp (#5312)
ywqzzy Jul 8, 2022
be8ea6a
ci: fix check-license github action (#5318)
gengliqi Jul 8, 2022
d7fdbd4
update proxy to raftstore-proxy-6.2 (#5316)
CalvinNeo Jul 8, 2022
b62dc6a
Change one `additional_input_at_end` to many streams in `ParallelInpu…
gengliqi Jul 8, 2022
649919d
support fine grained shuffle for window function (#5048)
guo-shaoge Jul 11, 2022
707fc6d
feat: pushdown get_format into TiFlash (#5269)
wirybeaver Jul 11, 2022
2af78c5
fix: format throw data truncated error (#5272)
xzhangxian1008 Jul 11, 2022
cf7aa5e
Print content of columns for gtest (#5243)
xzhangxian1008 Jul 11, 2022
5ddee14
*: also enable O3 for aarch64 (#5338)
SchrodingerZhu Jul 11, 2022
7a717b5
Add debug image build target for CentOS7 (#5344)
yibin87 Jul 11, 2022
4619605
*: mini refactor (#5326)
SeaRise Jul 11, 2022
5c33049
merge master
SeaRise Jul 11, 2022
b8a6178
f
SeaRise Jul 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tipb
10 changes: 8 additions & 2 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -58,6 +58,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

uint64_t total_rows;

// For fine grained shuffle, sender will partition data into muiltiple streams by hashing.
// ExchangeReceiverBlockInputStream only need to read its own stream, i.e., streams[stream_id].
// CoprocessorBlockInputStream doesn't take care of this.
size_t stream_id;

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (const auto & execution_summary : resp.execution_summaries())
@@ -120,7 +125,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

bool fetchRemoteResult()
{
auto result = remote_reader->nextResult(block_queue, sample_block);
auto result = remote_reader->nextResult(block_queue, sample_block, stream_id);
if (result.meet_error)
{
LOG_FMT_WARNING(log, "remote reader meets error: {}", result.error_msg);
@@ -168,13 +173,14 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
}

public:
TiRemoteBlockInputStream(std::shared_ptr<RemoteReader> remote_reader_, const String & req_id, const String & executor_id)
TiRemoteBlockInputStream(std::shared_ptr<RemoteReader> remote_reader_, const String & req_id, const String & executor_id, size_t stream_id_)
: remote_reader(remote_reader_)
, source_num(remote_reader->getSourceNum())
, name(fmt::format("TiRemoteBlockInputStream({})", RemoteReader::name))
, execution_summaries_inited(source_num)
, log(Logger::get(name, req_id, executor_id))
, total_rows(0)
, stream_id(stream_id_)
{
// generate sample block
ColumnsWithTypeAndName columns;
18 changes: 11 additions & 7 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
@@ -851,6 +851,7 @@ bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t c
{
tipb_executor->set_tp(tipb::ExecType::TypeExchangeReceiver);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::ExchangeReceiver * exchange_receiver = tipb_executor->mutable_exchange_receiver();
for (auto & field : output_schema)
{
@@ -1354,6 +1355,7 @@ bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id
{
tipb_executor->set_tp(tipb::ExecType::TypeWindow);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::Window * window = tipb_executor->mutable_window();
auto & input_schema = children[0]->output_schema;
for (const auto & expr : func_descs)
@@ -1430,6 +1432,7 @@ bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id,
{
tipb_executor->set_tp(tipb::ExecType::TypeSort);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::Sort * sort = tipb_executor->mutable_sort();
sort->set_ispartialsort(is_partial_sort);

@@ -1665,13 +1668,13 @@ ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, ti
return exchange_sender;
}

ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema)
ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count)
{
ExecutorPtr exchange_receiver = std::make_shared<mock::ExchangeReceiver>(executor_index, schema);
ExecutorPtr exchange_receiver = std::make_shared<mock::ExchangeReceiver>(executor_index, schema, fine_grained_shuffle_stream_count);
return exchange_receiver;
}

ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame)
ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count)
{
std::vector<ASTPtr> partition_columns;
if (partition_by_expr_list != nullptr)
@@ -1739,12 +1742,13 @@ ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr fun
window_exprs,
std::move(partition_columns),
std::move(order_columns),
frame);
frame,
fine_grained_shuffle_stream_count);
window->children.push_back(input);
return window;
}

ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort)
ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count)
{
std::vector<ASTPtr> order_columns;
if (order_by_expr_list != nullptr)
@@ -1758,8 +1762,8 @@ ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order
compileExpr(input->output_schema, elem->children[0]);
}
}
ExecutorPtr sort = std::make_shared<mock::Sort>(executor_index, input->output_schema, std::move(order_columns), is_partial_sort);
ExecutorPtr sort = std::make_shared<mock::Sort>(executor_index, input->output_schema, std::move(order_columns), is_partial_sort, fine_grained_shuffle_stream_count);
sort->children.push_back(input);
return sort;
}
} // namespace DB
} // namespace DB
19 changes: 13 additions & 6 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
@@ -139,8 +139,11 @@ struct ExchangeSender : Executor
struct ExchangeReceiver : Executor
{
TaskMetas task_metas;
ExchangeReceiver(size_t & index, const DAGSchema & output)
uint64_t fine_grained_shuffle_stream_count;

ExchangeReceiver(size_t & index, const DAGSchema & output, uint64_t fine_grained_shuffle_stream_count_ = 0)
: Executor(index, "exchange_receiver_" + std::to_string(index), output)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{}
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &) override;
@@ -292,13 +295,15 @@ struct Window : Executor
std::vector<ASTPtr> partition_by_exprs;
std::vector<ASTPtr> order_by_exprs;
MockWindowFrame frame;
uint64_t fine_grained_shuffle_stream_count;

Window(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> func_descs_, std::vector<ASTPtr> partition_by_exprs_, std::vector<ASTPtr> order_by_exprs_, MockWindowFrame frame_)
Window(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> func_descs_, std::vector<ASTPtr> partition_by_exprs_, std::vector<ASTPtr> order_by_exprs_, MockWindowFrame frame_, uint64_t fine_grained_shuffle_stream_count_ = 0)
: Executor(index_, "window_" + std::to_string(index_), output_schema_)
, func_descs(std::move(func_descs_))
, partition_by_exprs(std::move(partition_by_exprs_))
, order_by_exprs(order_by_exprs_)
, frame(frame_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{
}
// Currently only use Window Executor in Unit Test which don't call columnPrume.
@@ -311,11 +316,13 @@ struct Sort : Executor
{
std::vector<ASTPtr> by_exprs;
bool is_partial_sort;
uint64_t fine_grained_shuffle_stream_count;

Sort(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> by_exprs_, bool is_partial_sort_)
Sort(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> by_exprs_, bool is_partial_sort_, uint64_t fine_grained_shuffle_stream_count_ = 0)
: Executor(index_, "sort_" + std::to_string(index_), output_schema_)
, by_exprs(by_exprs_)
, is_partial_sort(is_partial_sort_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{
}
// Currently only use Sort Executor in Unit Test which don't call columnPrume.
@@ -343,11 +350,11 @@ ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr r

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);

ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema);
ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count = 0);

ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame);
ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count = 0);

ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort);
ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0);

void literalFieldToTiPBExpr(const ColumnInfo & ci, const Field & field, tipb::Expr * expr, Int32 collator_id);
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
@@ -290,8 +290,9 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA
tipb_exchange_receiver.encoded_task_meta_size(),
10,
/*req_id=*/"",
/*executor_id=*/"");
BlockInputStreamPtr ret = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, /*req_id=*/"", /*executor_id=*/"");
/*executor_id=*/"",
/*fine_grained_shuffle_stream_count=*/0);
BlockInputStreamPtr ret = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, /*req_id=*/"", /*executor_id=*/"", /*stream_id*/ 0);
return ret;
}
else
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
@@ -139,7 +139,8 @@ class CoprocessorReader
return detail;
}

CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header)
// stream_id is only meaningful for ExchagneReceiver.
CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header, size_t /*stream_id*/)
{
auto && [result, has_next] = resp_iter.next();
if (!result.error.empty())
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
@@ -116,6 +116,11 @@ constexpr UInt64 NO_ENGINE_SUBSTITUTION = 1ul << 30ul;
constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
} // namespace TiDBSQLMode

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
@@ -72,6 +72,7 @@ DAGDriver<true>::DAGDriver(
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_,
bool internal_)
: context(context_)
, dag_response(nullptr)
, writer(writer_)
, internal(internal_)
, log(&Poco::Logger::get("DAGDriver"))
@@ -129,15 +130,17 @@ try
auto streaming_writer = std::make_shared<StreamWriter>(writer);
TiDB::TiDBCollators collators;

std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr>>(
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr, false>>(
streaming_writer,
std::vector<Int64>(),
collators,
tipb::ExchangeType::PassThrough,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
true,
dag_context);
dag_context,
/*fine_grained_shuffle_stream_count=*/0,
/*fine_grained_shuffle_batch_size=*/0);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
}
Loading