Skip to content

Commit

Permalink
refine tiflash shutdown logic (#4291)
Browse files Browse the repository at this point in the history
* 1.add metrics of calldata&mpptunnel 2.refine shutdown logic

Signed-off-by: bestwoody <[email protected]>

* update

* Apply suggestions from code review

Co-authored-by: Fu Zhe <[email protected]>

* Update dbms/src/Flash/EstablishCall.cpp

Co-authored-by: Fu Zhe <[email protected]>

* add harm limit to wait

Signed-off-by: bestwoody <[email protected]>

* fix

Signed-off-by: bestwoody <[email protected]>

Co-authored-by: Fu Zhe <[email protected]>
  • Loading branch information
bestwoody and fuzhe1989 authored Mar 16, 2022
1 parent 921dd4d commit dfed057
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 9 deletions.
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ namespace DB
F(type_decode, {{"type", "decode"}}, ExpBuckets{0.0005, 2, 20}), F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_server_info, "Indicate the tiflash server info, and the value is the start timestamp (s).", Gauge, \
F(start_time, {"version", TiFlashBuildInfo::getReleaseVersion()}, {"hash", TiFlashBuildInfo::getGitHash()})) \
M(tiflash_object_count, "Number of objects", Gauge, \
F(type_count_of_establish_calldata, {"type", "count_of_establish_calldata"}), \
F(type_count_of_mpptunnel, {"type", "count_of_mpptunnel"})) \
M(tiflash_thread_count, "Number of threads", Gauge, \
F(type_max_threads_of_thdpool, {"type", "thread_pool_total_max"}), \
F(type_active_threads_of_thdpool, {"type", "thread_pool_active"}), \
Expand Down
25 changes: 22 additions & 3 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashMetrics.h>
#include <Flash/EstablishCall.h>
#include <Flash/FlashService.h>
#include <Flash/Mpp/Utils.h>
Expand All @@ -26,12 +27,18 @@ EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCo
, responder(&ctx)
, state(NEW_REQUEST)
{
GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Increment();
// As part of the initial CREATE state, we *request* that the system
// start processing requests. In this request, "this" acts are
// the tag uniquely identifying the request.
service->RequestEstablishMPPConnection(&ctx, &request, &responder, cq, notify_cq, this);
}

EstablishCallData::~EstablishCallData()
{
GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Decrement();
}

EstablishCallData * EstablishCallData::spawn(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr<std::atomic<bool>> & is_shutdown)
{
return new EstablishCallData(service, cq, notify_cq, is_shutdown);
Expand All @@ -53,7 +60,9 @@ void EstablishCallData::tryFlushOne()

void EstablishCallData::responderFinish(const grpc::Status & status)
{
if (!(*is_shutdown))
if (*is_shutdown)
finishTunnelAndResponder();
else
responder.Finish(status, this);
}

Expand All @@ -79,7 +88,10 @@ void EstablishCallData::initRpc()
bool EstablishCallData::write(const mpp::MPPDataPacket & packet)
{
if (*is_shutdown)
return false;
{
finishTunnelAndResponder();
return true;
}
responder.Write(packet, this);
return true;
}
Expand Down Expand Up @@ -116,11 +128,18 @@ void EstablishCallData::cancel()
delete this;
return;
}
finishTunnelAndResponder();
}

void EstablishCallData::finishTunnelAndResponder()
{
state = FINISH;
if (mpp_tunnel)
{
mpp_tunnel->consumerFinish("grpc writes failed.", true); //trigger mpp tunnel finish work
}
grpc::Status status(static_cast<grpc::StatusCode>(GRPC_STATUS_UNKNOWN), "Consumer exits unexpected, grpc writes failed.");
responderFinish(status);
responder.Finish(status, this);
}

void EstablishCallData::proceed()
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/EstablishCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class EstablishCallData : public PacketWriter
grpc::ServerCompletionQueue * notify_cq,
const std::shared_ptr<std::atomic<bool>> & is_shutdown);

~EstablishCallData();

bool write(const mpp::MPPDataPacket & packet) override;

void tryFlushOne() override;
Expand Down Expand Up @@ -79,6 +81,8 @@ class EstablishCallData : public PacketWriter

void initRpc();

void finishTunnelAndResponder();

void responderFinish(const grpc::Status & status);

std::mutex mu;
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ MPPTunnelBase<Writer>::MPPTunnelBase(
, log(getMPPTaskLog(log_, tunnel_id))
{
assert(!(is_local && is_async));
GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment();
}

template <typename Writer>
MPPTunnelBase<Writer>::~MPPTunnelBase()
{
SCOPE_EXIT({
GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Decrement();
});
try
{
{
Expand Down Expand Up @@ -308,6 +312,9 @@ void MPPTunnelBase<Writer>::consumerFinish(const String & err_msg, bool need_loc
send_queue.finish();

auto rest_work = [this, &err_msg] {
// it's safe to call it multiple times
if (finished && consumer_state.errHasSet())
return;
finished = true;
// must call setError in the critical area to keep consistent with `finished` from outside.
consumer_state.setError(err_msg);
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,18 @@ class MPPTunnelBase : private boost::noncopyable
void setError(const String & err_msg)
{
promise.set_value(err_msg);
err_has_set = true;
}

bool errHasSet() const
{
return err_has_set.load();
}

private:
std::promise<String> promise;
std::shared_future<String> future;
std::atomic<bool> err_has_set{false};
};
ConsumerState consumer_state;

Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,14 +642,17 @@ class Server::FlashGrpcServerHolder

~FlashGrpcServerHolder()
{
*is_shutdown = true;
const int wait_calldata_after_shutdown_interval_ms = 500;
std::this_thread::sleep_for(std::chrono::milliseconds(wait_calldata_after_shutdown_interval_ms)); // sleep 500ms to let operations of calldata called by MPPTunnel done.
/// Shut down grpc server.
// wait 5 seconds for pending rpcs to gracefully stop
gpr_timespec deadline{5, 0, GPR_TIMESPAN};
LOG_FMT_INFO(log, "Begin to shut down flash grpc server");
flash_grpc_server->Shutdown(deadline);
flash_grpc_server->Shutdown();
*is_shutdown = true;
// Wait all existed MPPTunnels done to prevent crash.
// If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done.
const int max_wait_cnt = 300;
int wait_cnt = 0;
while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt))
std::this_thread::sleep_for(std::chrono::seconds(1));

for (auto & cq : cqs)
cq->Shutdown();
for (auto & cq : notify_cqs)
Expand Down

0 comments on commit dfed057

Please sign in to comment.