Skip to content

Commit

Permalink
[native] Introduce lastCoordinatorHeartbeatMs to PrestoTask.
Browse files Browse the repository at this point in the history
It is used to detemrine if Coordinator has abandoned the Task.
  • Loading branch information
spershin committed May 14, 2024
1 parent 9b9a516 commit b15f0c5
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 5 deletions.
17 changes: 17 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,15 @@ void PrestoTask::updateHeartbeatLocked() {
info.lastHeartbeat = util::toISOTimestamp(lastHeartbeatMs);
}

void PrestoTask::updateCoordinatorHeartbeat() {
std::lock_guard<std::mutex> l(mutex);
updateCoordinatorHeartbeatLocked();
}

void PrestoTask::updateCoordinatorHeartbeatLocked() {
lastCoordinatorHeartbeatMs = velox::getCurrentTimeMs();
}

uint64_t PrestoTask::timeSinceLastHeartbeatMs() const {
std::lock_guard<std::mutex> l(mutex);
if (lastHeartbeatMs == 0UL) {
Expand All @@ -290,6 +299,14 @@ uint64_t PrestoTask::timeSinceLastHeartbeatMs() const {
return getCurrentTimeMs() - lastHeartbeatMs;
}

uint64_t PrestoTask::timeSinceLastCoordinatorHeartbeatMs() const {
std::lock_guard<std::mutex> l(mutex);
if (lastCoordinatorHeartbeatMs == 0UL) {
return 0UL;
}
return getCurrentTimeMs() - lastCoordinatorHeartbeatMs;
}

void PrestoTask::recordProcessCpuTime() {
if (processCpuTime_ > 0) {
return;
Expand Down
18 changes: 18 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,15 @@ struct PrestoTask {
/// has not been started, until the actual 'create task' message comes.
bool taskStarted{false};

/// Time point (in ms) when the last message (any) came for this task.
// TODO (spershin): Deprecate it, use only the 'lastCoordinatorHeartbeatMs'.
uint64_t lastHeartbeatMs{0};
/// Time point (in ms) when the last message came for this task from the
/// Coordinator. Used to determine if the Task has been abandoned.
uint64_t lastCoordinatorHeartbeatMs{0};
/// Time point (in ms) when the time we updated Task stats.
uint64_t lastTaskStatsUpdateMs = {0};

uint64_t lastMemoryReservation = {0};
uint64_t createTimeMs{0};
uint64_t firstSplitStartTimeMs{0};
Expand Down Expand Up @@ -132,10 +139,21 @@ struct PrestoTask {
/// Updates when this task was touched last time.
void updateHeartbeatLocked();

/// Updates time point (ms) when this task was touched last time by a message
/// from the Coordinator.
void updateCoordinatorHeartbeat();
void updateCoordinatorHeartbeatLocked();

/// Returns time (ms) since the task was touched last time (last heartbeat).
/// Returns zero, if never (shouldn't happen).
uint64_t timeSinceLastHeartbeatMs() const;

/// Returns time (ms) since the task was touched last time by a message from
/// the Coordinator.
/// If above never happened, returns time since the task start or zero, if
/// task never started.
uint64_t timeSinceLastCoordinatorHeartbeatMs() const;

protocol::TaskStatus updateStatus() {
std::lock_guard<std::mutex> l(mutex);
return updateStatusLocked();
Expand Down
15 changes: 11 additions & 4 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void cancelAbandonedTasksInternal(const TaskMap& taskMap, int32_t abandonedMs) {
for (const auto& [id, prestoTask] : taskMap) {
if (prestoTask->task != nullptr) {
if (prestoTask->task->isRunning()) {
if (prestoTask->timeSinceLastHeartbeatMs() >= abandonedMs) {
if (prestoTask->timeSinceLastCoordinatorHeartbeatMs() >= abandonedMs) {
LOG(INFO) << "Cancelling abandoned task '" << id << "'.";
prestoTask->task->requestCancel();
}
Expand Down Expand Up @@ -364,6 +364,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
auto prestoTask = findOrCreateTask(taskId, startProcessCpuTime);
{
std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateCoordinatorHeartbeatLocked();
prestoTask->updateHeartbeatLocked();
if (prestoTask->error == nullptr) {
prestoTask->error = exception;
Expand Down Expand Up @@ -428,7 +429,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateTask(
const velox::core::PlanFragment& planFragment,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime) {
return createOrUpdateTask(
return createOrUpdateTaskImpl(
taskId,
planFragment,
updateRequest.sources,
Expand All @@ -447,7 +448,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateBatchTask(

checkSplitsForBatchTask(planFragment.planNode, updateRequest.sources);

return createOrUpdateTask(
return createOrUpdateTaskImpl(
taskId,
planFragment,
updateRequest.sources,
Expand All @@ -456,7 +457,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateBatchTask(
startProcessCpuTime);
}

std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
const TaskId& taskId,
const velox::core::PlanFragment& planFragment,
const std::vector<protocol::TaskSource>& sources,
Expand All @@ -468,6 +469,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
auto prestoTask = findOrCreateTask(taskId, startProcessCpuTime);
{
std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateCoordinatorHeartbeatLocked();
if (not prestoTask->task && planFragment.planNode) {
// If the task is aborted, no need to do anything else.
// This takes care of DELETE task message coming before CREATE task.
Expand Down Expand Up @@ -620,6 +622,7 @@ std::unique_ptr<TaskInfo> TaskManager::deleteTask(

std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateHeartbeatLocked();
prestoTask->updateCoordinatorHeartbeatLocked();
auto execTask = prestoTask->task;
if (execTask) {
auto state = execTask->state();
Expand Down Expand Up @@ -771,6 +774,7 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
// Return current TaskInfo without waiting.
promise.setValue(
std::make_unique<protocol::TaskInfo>(prestoTask->updateInfo()));
prestoTask->updateCoordinatorHeartbeat();
return std::move(future).via(httpSrvCpuExecutor_);
}

Expand All @@ -780,6 +784,7 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
{
std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateHeartbeatLocked();
prestoTask->updateCoordinatorHeartbeatLocked();
if (!prestoTask->task) {
auto promiseHolder =
std::make_shared<PromiseHolder<std::unique_ptr<protocol::TaskInfo>>>(
Expand Down Expand Up @@ -937,6 +942,7 @@ folly::Future<std::unique_ptr<protocol::TaskStatus>> TaskManager::getTaskStatus(

if (!currentState || !maxWait) {
// Return task's status immediately without waiting.
prestoTask->updateCoordinatorHeartbeat();
return std::make_unique<protocol::TaskStatus>(prestoTask->updateStatus());
}

Expand All @@ -946,6 +952,7 @@ folly::Future<std::unique_ptr<protocol::TaskStatus>> TaskManager::getTaskStatus(
protocol::TaskStatus status;
{
std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateCoordinatorHeartbeatLocked();
if (!prestoTask->task) {
auto promiseHolder = std::make_shared<
PromiseHolder<std::unique_ptr<protocol::TaskStatus>>>(
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class TaskManager {
// coordinator for a considerable time.
void cancelAbandonedTasks();

std::unique_ptr<protocol::TaskInfo> createOrUpdateTask(
std::unique_ptr<protocol::TaskInfo> createOrUpdateTaskImpl(
const protocol::TaskId& taskId,
const velox::core::PlanFragment& planFragment,
const std::vector<protocol::TaskSource>& sources,
Expand Down
12 changes: 12 additions & 0 deletions presto-native-execution/presto_cpp/main/tests/PrestoTaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "presto_cpp/main/PrestoTask.h"
#include <gtest/gtest.h>
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/time/Timer.h"

DECLARE_bool(velox_memory_leak_check_enabled);

Expand Down Expand Up @@ -64,3 +65,14 @@ TEST_F(PrestoTaskTest, runtimeMetricConversion) {
EXPECT_EQ(veloxMetric.max, prestoMetric.max);
EXPECT_EQ(veloxMetric.min, prestoMetric.min);
}

TEST_F(PrestoTaskTest, basic) {
PrestoTask task{"20201107_130540_00011_wrpkw.1.2.3.4", "node2", 0};

// Test coordinator heartbeat.
EXPECT_EQ(task.timeSinceLastCoordinatorHeartbeatMs(), 0);
task.updateCoordinatorHeartbeat();
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_GE(task.timeSinceLastCoordinatorHeartbeatMs(), 100);
}

0 comments on commit b15f0c5

Please sign in to comment.