Skip to content

Commit

Permalink
Merge branch 'master' into fix_ks3
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored May 17, 2023
2 parents 18ee424 + 0d09036 commit 47b120a
Show file tree
Hide file tree
Showing 68 changed files with 780 additions and 444 deletions.
37 changes: 28 additions & 9 deletions dbms/src/DataStreams/WindowBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,22 +647,41 @@ void WindowBlockInputStream::appendInfo(FmtBuffer & buffer) const
action.appendInfo(buffer);
}

void WindowTransformAction::advanceRowNumber(RowNumber & x) const
void WindowTransformAction::advanceRowNumber(RowNumber & row_num) const
{
assert(x.block >= first_block_number);
assert(x.block - first_block_number < window_blocks.size());
assert(row_num.block >= first_block_number);
assert(row_num.block - first_block_number < window_blocks.size());

const auto block_rows = blockAt(x).rows;
assert(x.row < block_rows);
const auto block_rows = blockAt(row_num).rows;
assert(row_num.row < block_rows);

++x.row;
if (x.row < block_rows)
++row_num.row;
if (row_num.row < block_rows)
{
return;
}

x.row = 0;
++x.block;
row_num.row = 0;
++row_num.block;
}

RowNumber WindowTransformAction::getPreviousRowNumber(const RowNumber & row_num) const
{
assert(row_num.block >= first_block_number);
assert(!(row_num.block == 0 && row_num.row == 0));

RowNumber prev_row_num = row_num;
if (row_num.row > 0)
{
--prev_row_num.row;
return prev_row_num;
}

--prev_row_num.block;
assert(prev_row_num.block - first_block_number < window_blocks.size());
const auto new_block_rows = blockAt(prev_row_num).rows;
prev_row_num.row = new_block_rows - 1;
return prev_row_num;
}

bool WindowTransformAction::lead(RowNumber & x, size_t offset) const
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/WindowBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ struct WindowTransformAction
return window_blocks[x.block - first_block_number].output_columns;
}

void advanceRowNumber(RowNumber & x) const;
void advanceRowNumber(RowNumber & row_num) const;

RowNumber getPreviousRowNumber(const RowNumber & row_num) const;

bool lead(RowNumber & x, size_t offset) const;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/MockExecutor/FuncSigMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,6 @@ std::unordered_map<String, tipb::ExprType> window_func_name_to_sig({
{"Lead", tipb::ExprType::Lead},
{"Lag", tipb::ExprType::Lag},
{"FirstValue", tipb::ExprType::FirstValue},
{"LastValue", tipb::ExprType::LastValue},
});
} // namespace DB::tests
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockExecutor/WindowBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ bool WindowBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collat
break;
}
case tipb::ExprType::FirstValue:
case tipb::ExprType::LastValue:
{
assert(window_expr->children_size() == 1);
const auto arg_type = window_expr->children(0).field_type();
Expand Down Expand Up @@ -212,6 +213,7 @@ ExecutorBinderPtr compileWindow(ExecutorBinderPtr input, size_t & executor_index
break;
}
case tipb::ExprType::FirstValue:
case tipb::ExprType::LastValue:
{
ci = children_ci[0];
break;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,9 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
else if (query_block.isTableScanSource())
{
TiDBTableScan table_scan(query_block.source, query_block.source_name, dagContext());
if (!table_scan.getPushedDownFilters().empty() && unlikely(!context.getSettingsRef().dt_enable_read_thread))
throw Exception("Enable late materialization but disable read thread pool, please set the config `dt_enable_read_thread` of TiFlash to true,"
"or disable late materialization by set tidb variable `tidb_opt_enable_late_materialization` to false.");
if (unlikely(context.isTest()))
{
handleMockTableScan(table_scan, pipeline);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ class DAGResponseWriter
virtual void prepare(const Block &){};
virtual void write(const Block & block) = 0;

// For async writer, `isReadyForWrite` need to be called before calling `write`.
// For async writer, `isWritable` need to be called before calling `write`.
// ```
// while (!isReadyForWrite()) {}
// while (!isWritable()) {}
// write(block);
// ```
virtual bool isReadyForWrite() const { throw Exception("Unsupport"); }
virtual bool isWritable() const { throw Exception("Unsupport"); }

/// flush cached blocks for batch writer
virtual void flush() = 0;
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,13 +1064,16 @@ SourceOps DAGStorageInterpreter::buildLocalSourceOps(
return {};
const auto table_query_infos = generateSelectQueryInfos();

/// TODO: support multiple partitions
// TODO Improve the performance of partition table in extreme case.
// ref https://github.com/pingcap/tiflash/issues/4474
SourceOps source_ops;
for (const auto & table_query_info : table_query_infos)
{
const TableID table_id = table_query_info.first;
const SelectQueryInfo & query_info = table_query_info.second;
source_ops = buildLocalSourceOpsForPhysicalTable(exec_status, table_id, query_info, max_block_size);

auto table_source_ops = buildLocalSourceOpsForPhysicalTable(exec_status, table_id, query_info, max_block_size);
source_ops.insert(source_ops.end(), std::make_move_iterator(table_source_ops.begin()), std::make_move_iterator(table_source_ops.end()));
}

LOG_DEBUG(
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const std::unordered_map<tipb::ExprType, String> window_func_map({
{tipb::ExprType::Lead, "lead"},
{tipb::ExprType::Lag, "lag"},
{tipb::ExprType::FirstValue, "first_value"},
{tipb::ExprType::LastValue, "last_value"},
});

const std::unordered_map<tipb::ExprType, String> agg_func_map({
Expand Down Expand Up @@ -1032,10 +1033,10 @@ bool isWindowFunctionExpr(const tipb::Expr & expr)
case tipb::ExprType::Lead:
case tipb::ExprType::Lag:
case tipb::ExprType::FirstValue:
case tipb::ExprType::LastValue:
// case tipb::ExprType::CumeDist:
// case tipb::ExprType::PercentRank:
// case tipb::ExprType::Ntile:
// case tipb::ExprType::LastValue:
// case tipb::ExprType::NthValue:
return true;
default:
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/StreamWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct StreamWriter
if (!writer->Write(resp))
throw Exception("Failed to write resp");
}
bool isReadyForWrite() const { throw Exception("Unsupport async write"); }
bool isWritable() const { throw Exception("Unsupport async write"); }
};

using StreamWriterPtr = std::shared_ptr<StreamWriter>;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ void StreamingDAGResponseWriter<StreamWriterPtr>::flush()
}

template <class StreamWriterPtr>
bool StreamingDAGResponseWriter<StreamWriterPtr>::isReadyForWrite() const
bool StreamingDAGResponseWriter<StreamWriterPtr>::isWritable() const
{
return writer->isReadyForWrite();
return writer->isWritable();
}

template <class StreamWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
Int64 batch_send_min_limit_,
DAGContext & dag_context_);
void write(const Block & block) override;
bool isReadyForWrite() const override;
bool isWritable() const override;
void flush() override;

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct MockStreamWriter
{}

void write(tipb::SelectResponse & response) { checker(response); }
bool isReadyForWrite() const { throw Exception("Unsupport async write"); }
bool isWritable() const { throw Exception("Unsupport async write"); }

private:
MockStreamWriterChecker checker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ struct MockWriter
queue->push(tracked_packet);
}
static uint16_t getPartitionNum() { return 1; }
static bool isReadyForWrite() { throw Exception("Unsupport async write"); }
static bool isWritable() { throw Exception("Unsupport async write"); }

std::vector<tipb::FieldType> result_field_types;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::flush()
}

template <class ExchangeWriterPtr>
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::isReadyForWrite() const
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::isWritable() const
{
return writer->isReadyForWrite();
return writer->isWritable();
}

template <class ExchangeWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
tipb::CompressionMode compression_mode_,
tipb::ExchangeType exchange_type_);
void write(const Block & block) override;
bool isReadyForWrite() const override;
bool isWritable() const override;
void flush() override;

private:
Expand Down
49 changes: 24 additions & 25 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -824,39 +824,20 @@ DecodeDetail ExchangeReceiverBase<RPCContext>::decodeChunks(
}

template <typename RPCContext>
ReceiveResult ExchangeReceiverBase<RPCContext>::receive(size_t stream_id)
{
return receive(
stream_id,
[&](size_t stream_id, RecvMsgPtr & recv_msg) {
return grpc_recv_queue[stream_id].pop(recv_msg);
});
}

template <typename RPCContext>
ReceiveResult ExchangeReceiverBase<RPCContext>::nonBlockingReceive(size_t stream_id)
{
return receive(
stream_id,
[&](size_t stream_id, RecvMsgPtr & recv_msg) {
return grpc_recv_queue[stream_id].tryPop(recv_msg);
});
}

template <typename RPCContext>
ReceiveResult ExchangeReceiverBase<RPCContext>::receive(
size_t stream_id,
std::function<MPMCQueueResult(size_t, RecvMsgPtr &)> recv_func)
void ExchangeReceiverBase<RPCContext>::verifyStreamId(size_t stream_id) const
{
if (unlikely(stream_id >= grpc_recv_queue.size()))
{
auto err_msg = fmt::format("stream_id out of range, stream_id: {}, total_channel_count: {}", stream_id, grpc_recv_queue.size());
LOG_ERROR(exc_log, err_msg);
throw Exception(err_msg);
}
}

RecvMsgPtr recv_msg;
switch (recv_func(stream_id, recv_msg))
template <typename RPCContext>
ReceiveResult ExchangeReceiverBase<RPCContext>::toReceiveResult(MPMCQueueResult result, RecvMsgPtr && recv_msg)
{
switch (result)
{
case MPMCQueueResult::OK:
assert(recv_msg);
Expand All @@ -868,6 +849,24 @@ ReceiveResult ExchangeReceiverBase<RPCContext>::receive(
}
}

template <typename RPCContext>
ReceiveResult ExchangeReceiverBase<RPCContext>::receive(size_t stream_id)
{
verifyStreamId(stream_id);
RecvMsgPtr recv_msg;
auto res = grpc_recv_queue[stream_id].pop(recv_msg);
return toReceiveResult(res, std::move(recv_msg));
}

template <typename RPCContext>
ReceiveResult ExchangeReceiverBase<RPCContext>::tryReceive(size_t stream_id)
{
// verifyStreamId has been called in `ExchangeReceiverSourceOp`.
RecvMsgPtr recv_msg;
auto res = grpc_recv_queue[stream_id].tryPop(recv_msg);
return toReceiveResult(res, std::move(recv_msg));
}

template <typename RPCContext>
ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::toExchangeReceiveResult(
ReceiveResult & recv_result,
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class ExchangeReceiverBase
void close();

ReceiveResult receive(size_t stream_id);
ReceiveResult nonBlockingReceive(size_t stream_id);
ReceiveResult tryReceive(size_t stream_id);

ExchangeReceiverResult toExchangeReceiveResult(
ReceiveResult & recv_result,
Expand All @@ -145,6 +145,8 @@ class ExchangeReceiverBase
MemoryTracker * getMemoryTracker() const { return mem_tracker.get(); }
std::atomic<Int64> * getDataSizeInQueue() { return &data_size_in_queue; }

void verifyStreamId(size_t stream_id) const;

private:
std::shared_ptr<MemoryTracker> mem_tracker;
using Request = typename RPCContext::Request;
Expand Down Expand Up @@ -187,11 +189,8 @@ class ExchangeReceiverBase
const RecvMsgPtr & recv_msg,
std::unique_ptr<CHBlockChunkDecodeAndSquash> & decoder_ptr);

ReceiveResult receive(
size_t stream_id,
std::function<MPMCQueueResult(size_t, RecvMsgPtr &)> recv_func);
inline ReceiveResult toReceiveResult(MPMCQueueResult result, RecvMsgPtr && recv_msg);

private:
void prepareMsgChannels();
void prepareGRPCReceiveQueue();
void addLocalConnectionNum();
Expand All @@ -213,6 +212,7 @@ class ExchangeReceiverBase
return !disaggregated_dispatch_reqs.empty();
}

private:
std::shared_ptr<RPCContext> rpc_context;

const tipb::ExchangeReceiver pb_exchange_receiver;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ void FineGrainedShuffleWriter<ExchangeWriterPtr>::flush()
}

template <class ExchangeWriterPtr>
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::isReadyForWrite() const
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::isWritable() const
{
return writer->isReadyForWrite();
return writer->isWritable();
}

template <class ExchangeWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class FineGrainedShuffleWriter : public DAGResponseWriter
tipb::CompressionMode compression_mode_);
void prepare(const Block & sample_block) override;
void write(const Block & block) override;
bool isReadyForWrite() const override;
bool isWritable() const override;
void flush() override;

private:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/HashPartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ void HashPartitionWriter<ExchangeWriterPtr>::flush()
}

template <class ExchangeWriterPtr>
bool HashPartitionWriter<ExchangeWriterPtr>::isReadyForWrite() const
bool HashPartitionWriter<ExchangeWriterPtr>::isWritable() const
{
return writer->isReadyForWrite();
return writer->isWritable();
}

template <class ExchangeWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/HashPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class HashPartitionWriter : public DAGResponseWriter
MPPDataPacketVersion data_codec_version_,
tipb::CompressionMode compression_mode_);
void write(const Block & block) override;
bool isReadyForWrite() const override;
bool isWritable() const override;
void flush() override;

private:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/LocalRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ struct LocalRequestHandler
return channel_writer.write<enable_fine_grained_shuffle, is_force>(source_index, tracked_packet);
}

bool isReadyForWrite() const
bool isWritable() const
{
return channel_writer.isReadyForWrite();
return channel_writer.isWritable();
}

void writeDone(bool meet_error, const String & local_err_msg) const
Expand Down
Loading

0 comments on commit 47b120a

Please sign in to comment.