Skip to content

Commit

Permalink
Fix TiFlash hang issue after #9072 (#9424)
Browse files Browse the repository at this point in the history
close #9413

Signed-off-by: xufei <[email protected]>
  • Loading branch information
windtalker authored Sep 10, 2024
1 parent 0ea1cd9 commit 1be6569
Show file tree
Hide file tree
Showing 27 changed files with 331 additions and 55 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Common/GRPCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class GRPCSendQueue
}

bool isWritable() const { return send_queue.isWritable(); }
void notifyNextPipelineWriter() { send_queue.notifyNextPipelineWriter(); }

void registerPipeReadTask(TaskPtr && task) { send_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { send_queue.registerPipeWriteTask(std::move(task)); }
Expand Down Expand Up @@ -299,6 +300,7 @@ class GRPCRecvQueue
}

bool isWritable() const { return recv_queue.isWritable(); }
void notifyNextPipelineWriter() { return recv_queue.notifyNextPipelineWriter(); }

void registerPipeReadTask(TaskPtr && task) { recv_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { recv_queue.registerPipeWriteTask(std::move(task)); }
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/LooseBoundedMPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ class LooseBoundedMPMCQueue
return !isFullWithoutLock();
}

void notifyNextPipelineWriter() { pipe_writer_cv.notifyOne(); }

MPMCQueueStatus getStatus() const
{
std::lock_guard lock(mu);
Expand Down
23 changes: 21 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ class DAGResponseWriter
DAGResponseWriter(Int64 records_per_chunk_, DAGContext & dag_context_);
/// prepared with sample block
virtual void prepare(const Block &){};
virtual void write(const Block & block) = 0;
void write(const Block & block)
{
if (!doWrite(block))
{
notifyNextPipelineWriter();
}
}

// For async writer, `waitForWritable` need to be called before calling `write`.
// ```
Expand All @@ -40,10 +46,23 @@ class DAGResponseWriter
virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); }

/// flush cached blocks for batch writer
virtual void flush() = 0;
void flush()
{
if (!doFlush())
{
notifyNextPipelineWriter();
}
}

virtual ~DAGResponseWriter() = default;

protected:
// return true if write is actually write the data
virtual bool doWrite(const Block & block) = 0;
// return true if flush is actually flush data
virtual bool doFlush() = 0;
virtual void notifyNextPipelineWriter() = 0;

Int64 records_per_chunk;
DAGContext & dag_context;
};
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/StreamWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct CopStreamWriter
throw Exception("Failed to write resp");
}
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}
};

struct BatchCopStreamWriter
Expand All @@ -83,6 +84,7 @@ struct BatchCopStreamWriter
throw Exception("Failed to write resp");
}
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}
};

using CopStreamWriterPtr = std::shared_ptr<CopStreamWriter>;
Expand Down
21 changes: 17 additions & 4 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ StreamingDAGResponseWriter<StreamWriterPtr>::StreamingDAGResponseWriter(
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::flush()
bool StreamingDAGResponseWriter<StreamWriterPtr>::doFlush()
{
if (rows_in_blocks > 0)
{
encodeThenWriteBlocks();
return true;
}
return false;
}

template <class StreamWriterPtr>
Expand All @@ -74,7 +78,13 @@ WaitResult StreamingDAGResponseWriter<StreamWriterPtr>::waitForWritable() const
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::write(const Block & block)
void StreamingDAGResponseWriter<StreamWriterPtr>::notifyNextPipelineWriter()
{
return writer->notifyNextPipelineWriter();
}

template <class StreamWriterPtr>
bool StreamingDAGResponseWriter<StreamWriterPtr>::doWrite(const Block & block)
{
RUNTIME_CHECK_MSG(
block.columns() == dag_context.result_field_types.size(),
Expand All @@ -87,14 +97,17 @@ void StreamingDAGResponseWriter<StreamWriterPtr>::write(const Block & block)
}

if (static_cast<Int64>(rows_in_blocks) > batch_send_min_limit)
{
encodeThenWriteBlocks();
return true;
}
return false;
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::encodeThenWriteBlocks()
{
if (unlikely(blocks.empty()))
return;
assert(!blocks.empty());

TrackedSelectResp response;
response.setEncodeType(dag_context.encode_type);
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
Int64 records_per_chunk_,
Int64 batch_send_min_limit_,
DAGContext & dag_context_);
void write(const Block & block) override;
WaitResult waitForWritable() const override;
void flush() override;

protected:
bool doWrite(const Block & block) override;
bool doFlush() override;
void notifyNextPipelineWriter() override;

private:
void encodeThenWriteBlocks();
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void UnaryDAGResponseWriter::appendWarningsToDAGResponse()
dag_response->set_warning_count(dag_context.getWarningCount());
}

void UnaryDAGResponseWriter::flush()
bool UnaryDAGResponseWriter::doFlush()
{
if (current_records_num > 0)
{
Expand All @@ -86,9 +86,10 @@ void UnaryDAGResponseWriter::flush()
throw TiFlashException(
"DAG response is too big, please check config about region size or region merge scheduler",
Errors::Coprocessor::Internal);
return true;
}

void UnaryDAGResponseWriter::write(const Block & block)
bool UnaryDAGResponseWriter::doWrite(const Block & block)
{
if (block.columns() != dag_context.result_field_types.size())
throw TiFlashException("Output column size mismatch with field type size", Errors::Coprocessor::Internal);
Expand Down Expand Up @@ -116,5 +117,6 @@ void UnaryDAGResponseWriter::write(const Block & block)
row_index = upper;
}
}
return true;
}
} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ class UnaryDAGResponseWriter : public DAGResponseWriter
public:
UnaryDAGResponseWriter(tipb::SelectResponse * response_, Int64 records_per_chunk_, DAGContext & dag_context_);

void write(const Block & block) override;
void flush() override;
void encodeChunkToDAGResponse();
void appendWarningsToDAGResponse();

protected:
bool doWrite(const Block & block) override;
bool doFlush() override;
void notifyNextPipelineWriter() override{};

private:
tipb::SelectResponse * dag_response;
std::unique_ptr<ChunkCodecStream> chunk_codec_stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct MockStreamWriter

void write(tipb::SelectResponse & response) { checker(response); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}

private:
MockStreamWriterChecker checker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ struct MockWriter
}
static uint16_t getPartitionNum() { return 1; }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}

std::vector<tipb::FieldType> result_field_types;

Expand Down
25 changes: 20 additions & 5 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
switch (data_codec_version)
{
case MPPDataPacketV0:
if (batch_send_min_limit <= 0)
batch_send_min_limit = 1;
break;
case MPPDataPacketV1:
default:
Expand All @@ -64,10 +66,14 @@ BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::flush()
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::doFlush()
{
if (rows_in_blocks > 0)
{
writeBlocks();
return true;
}
return false;
}

template <class ExchangeWriterPtr>
Expand All @@ -77,7 +83,13 @@ WaitResult BroadcastOrPassThroughWriter<ExchangeWriterPtr>::waitForWritable() co
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::write(const Block & block)
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::notifyNextPipelineWriter()
{
writer->notifyNextPipelineWriter();
}

template <class ExchangeWriterPtr>
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::doWrite(const Block & block)
{
RUNTIME_CHECK(!block.info.selective);
RUNTIME_CHECK_MSG(
Expand All @@ -90,15 +102,18 @@ void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::write(const Block & block)
blocks.push_back(block);
}

if (static_cast<Int64>(rows_in_blocks) > batch_send_min_limit)
if (static_cast<Int64>(rows_in_blocks) >= batch_send_min_limit)
{
writeBlocks();
return true;
}
return false;
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::writeBlocks()
{
if unlikely (blocks.empty())
return;
assert(!blocks.empty());

// check schema
if (!expected_types.empty())
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
MPPDataPacketVersion data_codec_version_,
tipb::CompressionMode compression_mode_,
tipb::ExchangeType exchange_type_);
void write(const Block & block) override;
WaitResult waitForWritable() const override;
void flush() override;

protected:
bool doWrite(const Block & block) override;
bool doFlush() override;
void notifyNextPipelineWriter() override;

private:
void writeBlocks();
Expand Down
21 changes: 17 additions & 4 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,20 @@ void FineGrainedShuffleWriter<ExchangeWriterPtr>::prepare(const Block & sample_b
}

template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::flush()
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::doFlush()
{
if (rows_in_blocks > 0)
{
batchWriteFineGrainedShuffle();
return true;
}
return false;
}

template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::notifyNextPipelineWriter()
{
writer->notifyNextPipelineWriter();
}

template <class ExchangeWriterPtr>
Expand All @@ -103,7 +113,7 @@ WaitResult FineGrainedShuffleWriter<ExchangeWriterPtr>::waitForWritable() const
}

template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::write(const Block & block)
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::doWrite(const Block & block)
{
RUNTIME_CHECK_MSG(prepared, "FineGrainedShuffleWriter should be prepared before writing.");
RUNTIME_CHECK_MSG(
Expand All @@ -124,7 +134,11 @@ void FineGrainedShuffleWriter<ExchangeWriterPtr>::write(const Block & block)

if (blocks.size() == fine_grained_shuffle_stream_count
|| static_cast<UInt64>(rows_in_blocks) >= batch_send_row_limit)
{
batchWriteFineGrainedShuffle();
return true;
}
return false;
}

template <class ExchangeWriterPtr>
Expand All @@ -148,8 +162,7 @@ template <class ExchangeWriterPtr>
template <MPPDataPacketVersion version>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::batchWriteFineGrainedShuffleImpl()
{
if (blocks.empty())
return;
assert(!blocks.empty());

{
assert(rows_in_blocks > 0);
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ class FineGrainedShuffleWriter : public DAGResponseWriter
MPPDataPacketVersion data_codec_version_,
tipb::CompressionMode compression_mode_);
void prepare(const Block & sample_block) override;
void write(const Block & block) override;
WaitResult waitForWritable() const override;
void flush() override;

protected:
bool doWrite(const Block & block) override;
bool doFlush() override;
void notifyNextPipelineWriter() override;

private:
void batchWriteFineGrainedShuffle();
Expand Down
Loading

0 comments on commit 1be6569

Please sign in to comment.