Skip to content

Commit

Permalink
Pipeline: use notify instead of polling for ExchangeSender (#9072)
Browse files Browse the repository at this point in the history
ref #8869

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
SeaRise and ti-chi-bot[bot] authored Sep 2, 2024
1 parent 897a9da commit 0a80ae0
Show file tree
Hide file tree
Showing 30 changed files with 173 additions and 51 deletions.
6 changes: 6 additions & 0 deletions dbms/src/Common/GRPCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class GRPCSendQueue

bool isWritable() const { return send_queue.isWritable(); }

void registerPipeReadTask(TaskPtr && task) { send_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { send_queue.registerPipeWriteTask(std::move(task)); }

private:
friend class tests::TestGRPCSendQueue;

Expand Down Expand Up @@ -297,6 +300,9 @@ class GRPCRecvQueue

bool isWritable() const { return recv_queue.isWritable(); }

void registerPipeReadTask(TaskPtr && task) { recv_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { recv_queue.registerPipeWriteTask(std::move(task)); }

private:
friend class tests::TestGRPCRecvQueue;

Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Core/Block.h>
#include <Flash/Coprocessor/WaitResult.h>
#include <common/types.h>
#include <tipb/select.pb.h>

Expand All @@ -30,12 +31,13 @@ class DAGResponseWriter
virtual void prepare(const Block &){};
virtual void write(const Block & block) = 0;

// For async writer, `isWritable` need to be called before calling `write`.
// For async writer, `waitForWritable` need to be called before calling `write`.
// ```
// while (!isWritable()) {}
// auto res = waitForWritable();
// switch (res) case...
// write(block);
// ```
virtual bool isWritable() const { throw Exception("Unsupport"); }
virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); }

/// flush cached blocks for batch writer
virtual void flush() = 0;
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/StreamWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Common/Exception.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/WaitResult.h>
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
Expand Down Expand Up @@ -57,7 +58,7 @@ struct CopStreamWriter
if (!writer->Write(resp))
throw Exception("Failed to write resp");
}
bool isWritable() const { throw Exception("Unsupport async write"); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
};

struct BatchCopStreamWriter
Expand All @@ -81,7 +82,7 @@ struct BatchCopStreamWriter
if (!writer->Write(resp))
throw Exception("Failed to write resp");
}
bool isWritable() const { throw Exception("Unsupport async write"); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
};

using CopStreamWriterPtr = std::shared_ptr<CopStreamWriter>;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ void StreamingDAGResponseWriter<StreamWriterPtr>::flush()
}

template <class StreamWriterPtr>
bool StreamingDAGResponseWriter<StreamWriterPtr>::isWritable() const
WaitResult StreamingDAGResponseWriter<StreamWriterPtr>::waitForWritable() const
{
return writer->isWritable();
return writer->waitForWritable();
}

template <class StreamWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
Int64 batch_send_min_limit_,
DAGContext & dag_context_);
void write(const Block & block) override;
bool isWritable() const override;
WaitResult waitForWritable() const override;
void flush() override;

private:
Expand Down
25 changes: 25 additions & 0 deletions dbms/src/Flash/Coprocessor/WaitResult.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

namespace DB
{
enum class WaitResult
{
Ready,
WaitForPolling,
WaitForNotify
};
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ struct MockStreamWriter
{}

void write(tipb::SelectResponse & response) { checker(response); }
static bool isWritable() { throw Exception("Unsupport async write"); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }

private:
MockStreamWriterChecker checker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ struct MockWriter
queue->push(tracked_packet);
}
static uint16_t getPartitionNum() { return 1; }
static bool isWritable() { throw Exception("Unsupport async write"); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }

std::vector<tipb::FieldType> result_field_types;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ PipelineExecutor::PipelineExecutor(
/*query_id=*/context.getDAGContext()->isMPPTask() ? context.getDAGContext()->getMPPTaskId().toString() : "",
req_id,
memory_tracker_,
context.getDAGContext(),
auto_spill_trigger,
register_operator_spill_context,
context.getDAGContext()->getResourceGroupName())
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutorContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Operators/SharedQueue.h>

Expand Down Expand Up @@ -52,6 +55,24 @@ String PipelineExecutorContext::getExceptionMsg()
}
}

String PipelineExecutorContext::getTrimmedErrMsg()
{
try
{
auto cur_exception_ptr = getExceptionPtr();
if (!cur_exception_ptr)
return "";
std::rethrow_exception(cur_exception_ptr);
}
catch (...)
{
auto err_msg = getCurrentExceptionMessage(true, true);
if (likely(!err_msg.empty()))
trimStackTrace(err_msg);
return err_msg;
}
}

void PipelineExecutorContext::onErrorOccurred(const String & err_msg)
{
DB::Exception e(err_msg);
Expand Down Expand Up @@ -155,6 +176,12 @@ void PipelineExecutorContext::cancel()
if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release))
{
cancelSharedQueues();
if (likely(dag_context))
{
// Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified.
if (dag_context->tunnel_set)
dag_context->tunnel_set->close(getTrimmedErrMsg(), false);
}
cancelResultQueueIfNeed();
if likely (TaskScheduler::instance && !query_id.empty())
TaskScheduler::instance->cancel(query_id, resource_group_name);
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutorContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ using RegisterOperatorSpillContext = std::function<void(const std::shared_ptr<Op
class SharedQueue;
using SharedQueuePtr = std::shared_ptr<SharedQueue>;

class DAGContext;

class PipelineExecutorContext : private boost::noncopyable
{
public:
Expand All @@ -51,12 +53,14 @@ class PipelineExecutorContext : private boost::noncopyable
const String & query_id_,
const String & req_id,
const MemoryTrackerPtr & mem_tracker_,
DAGContext * dag_context_ = nullptr,
AutoSpillTrigger * auto_spill_trigger_ = nullptr,
const RegisterOperatorSpillContext & register_operator_spill_context_ = nullptr,
const String & resource_group_name_ = "")
: query_id(query_id_)
, log(Logger::get(req_id))
, mem_tracker(mem_tracker_)
, dag_context(dag_context_)
, auto_spill_trigger(auto_spill_trigger_)
, register_operator_spill_context(register_operator_spill_context_)
, resource_group_name(resource_group_name_)
Expand Down Expand Up @@ -134,6 +138,8 @@ class PipelineExecutorContext : private boost::noncopyable
private:
bool setExceptionPtr(const std::exception_ptr & exception_ptr_);

String getTrimmedErrMsg();

// Need to be called under lock.
bool isWaitMode();

Expand All @@ -149,6 +155,8 @@ class PipelineExecutorContext : private boost::noncopyable

MemoryTrackerPtr mem_tracker;

DAGContext * dag_context{nullptr};

std::mutex mu;
std::condition_variable cv;
std::exception_ptr exception_ptr;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::flush()
}

template <class ExchangeWriterPtr>
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::isWritable() const
WaitResult BroadcastOrPassThroughWriter<ExchangeWriterPtr>::waitForWritable() const
{
return writer->isWritable();
return writer->waitForWritable();
}

template <class ExchangeWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
tipb::CompressionMode compression_mode_,
tipb::ExchangeType exchange_type_);
void write(const Block & block) override;
bool isWritable() const override;
WaitResult waitForWritable() const override;
void flush() override;

private:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ void FineGrainedShuffleWriter<ExchangeWriterPtr>::flush()
}

template <class ExchangeWriterPtr>
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::isWritable() const
WaitResult FineGrainedShuffleWriter<ExchangeWriterPtr>::waitForWritable() const
{
return writer->isWritable();
return writer->waitForWritable();
}

template <class ExchangeWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class FineGrainedShuffleWriter : public DAGResponseWriter
tipb::CompressionMode compression_mode_);
void prepare(const Block & sample_block) override;
void write(const Block & block) override;
bool isWritable() const override;
WaitResult waitForWritable() const override;
void flush() override;

private:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/HashPartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ void HashPartitionWriter<ExchangeWriterPtr>::flush()
}

template <class ExchangeWriterPtr>
bool HashPartitionWriter<ExchangeWriterPtr>::isWritable() const
WaitResult HashPartitionWriter<ExchangeWriterPtr>::waitForWritable() const
{
return writer->isWritable();
return writer->waitForWritable();
}

template <class ExchangeWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/HashPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class HashPartitionWriter : public DAGResponseWriter
MPPDataPacketVersion data_codec_version_,
tipb::CompressionMode compression_mode_);
void write(const Block & block) override;
bool isWritable() const override;
WaitResult waitForWritable() const override;
void flush() override;

private:
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Mpp/LocalRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ struct LocalRequestHandler

bool isWritable() const { return msg_queue->isWritable(); }

void registerPipeReadTask(TaskPtr && task) const { msg_queue->registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) const { msg_queue->registerPipeWriteTask(std::move(task)); }

void writeDone(bool meet_error, const String & local_err_msg) const
{
notify_write_done(meet_error, local_err_msg);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,13 +759,13 @@ void MPPTask::abort(const String & message, AbortType abort_type)
}
else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status))
{
/// abort the components from top to bottom because if bottom components are aborted
/// first, the top components may see an error caused by the abort, which is not
/// abort mpptunnels first because if others components are aborted
/// first, the mpptunnels may see an error caused by the abort, which is not
/// the original error
setErrString(message);
abortTunnels(message, false);
abortQueryExecutor();
abortReceivers();
abortQueryExecutor();
scheduleThisTask(ScheduleState::FAILED);
/// runImpl is running, leave remaining work to runImpl
LOG_WARNING(log, "Finish abort task from running");
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock<std::mutex> & lk)
throw Exception(fmt::format("MPPTunnel {} can not be connected because MPPTask is cancelled", tunnel_id));
}

bool MPPTunnel::isWritable() const
WaitResult MPPTunnel::waitForWritable() const
{
std::unique_lock lk(mu);
switch (status)
Expand All @@ -396,12 +396,17 @@ bool MPPTunnel::isWritable() const
if (unlikely(timeout_stopwatch->elapsed() > timeout_nanoseconds))
throw Exception(fmt::format("{} is timeout", tunnel_id));
}
return false;
return WaitResult::WaitForPolling;
}
case TunnelStatus::Connected:
case TunnelStatus::WaitingForSenderFinish:
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id);
return tunnel_sender->isWritable();
if (!tunnel_sender->isWritable())
{
setNotifyFuture(tunnel_sender);
return WaitResult::WaitForNotify;
}
return WaitResult::Ready;
case TunnelStatus::Finished:
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id);
throw Exception(fmt::format(
Expand Down
Loading

0 comments on commit 0a80ae0

Please sign in to comment.