Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] SystemConnector to query system.runtime.tasks table #21416

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ add_library(
QueryContextManager.cpp
ServerOperation.cpp
SignalHandler.cpp
SystemConnector.cpp
TaskManager.cpp
TaskResource.cpp
PeriodicHeartbeatManager.cpp
Expand Down
23 changes: 23 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "presto_cpp/main/Announcer.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/SystemConnector.h"
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
Expand Down Expand Up @@ -232,6 +233,17 @@ void PrestoServer::run() {
std::make_unique<IcebergPrestoToVeloxConnector>("iceberg"));
registerPrestoToVeloxConnector(
std::make_unique<TpchPrestoToVeloxConnector>("tpch"));
// Presto server uses system catalog or system schema in other catalogs
// in different places in the code. All these resolve to the SystemConnector.
// Depending on where the operator or column is used, different prefixes can
// be used in the naming. So the protocol class is mapped
// to all the different prefixes for System tables/columns.
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system"));
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

initializeVeloxMemory();
initializeThreadPools();
Expand Down Expand Up @@ -438,6 +450,7 @@ void PrestoServer::run() {
}
prestoServerOperations_ =
std::make_unique<PrestoServerOperations>(taskManager_.get(), this);
registerSystemConnector();

// The endpoint used by operation in production.
httpServer_->registerGet(
Expand Down Expand Up @@ -939,6 +952,15 @@ std::vector<std::string> PrestoServer::registerConnectors(
return catalogNames;
}

void PrestoServer::registerSystemConnector() {
PRESTO_STARTUP_LOG(INFO) << "Registering system catalog "
<< " using connector SystemConnector";
VELOX_CHECK(taskManager_);
auto systemConnector =
std::make_shared<SystemConnector>("$system@system", taskManager_.get());
aditi-pandit marked this conversation as resolved.
Show resolved Hide resolved
velox::connector::registerConnector(systemConnector);
}

void PrestoServer::unregisterConnectors() {
PRESTO_SHUTDOWN_LOG(INFO) << "Unregistering connectors";
auto connectors = facebook::velox::connector::getAllConnectors();
Expand All @@ -959,6 +981,7 @@ void PrestoServer::unregisterConnectors() {
}
}

facebook::velox::connector::unregisterConnector("$system@system");
PRESTO_SHUTDOWN_LOG(INFO)
<< "Unregistered " << connectors.size() << " connectors";
}
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ class PrestoServer {
// Periodically yield tasks if there are tasks queued.
void yieldTasks();

void registerSystemConnector();

const std::string configDirectoryPath_;

std::shared_ptr<CoordinatorDiscoverer> coordinatorDiscoverer_;
Expand Down
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,15 @@ void PrestoTask::updateTimeInfoLocked(
util::toISOTimestamp(veloxTaskStats.executionStartTimeMs);
prestoTaskStats.firstStartTime =
util::toISOTimestamp(veloxTaskStats.firstSplitStartTimeMs);
createTimeMs = veloxTaskStats.executionStartTimeMs;
firstSplitStartTimeMs = veloxTaskStats.firstSplitStartTimeMs;
prestoTaskStats.lastStartTime =
util::toISOTimestamp(veloxTaskStats.lastSplitStartTimeMs);
prestoTaskStats.lastEndTime =
util::toISOTimestamp(veloxTaskStats.executionEndTimeMs);
prestoTaskStats.endTime =
util::toISOTimestamp(veloxTaskStats.executionEndTimeMs);
lastEndTimeMs = veloxTaskStats.executionEndTimeMs;

if (veloxTaskStats.executionEndTimeMs > veloxTaskStats.executionStartTimeMs) {
prestoTaskStats.elapsedTimeInNanos = (veloxTaskStats.executionEndTimeMs -
Expand Down Expand Up @@ -804,6 +807,9 @@ folly::dynamic PrestoTask::toJson() const {
obj["lastHeartbeatMs"] = lastHeartbeatMs;
obj["lastTaskStatsUpdateMs"] = lastTaskStatsUpdateMs;
obj["lastMemoryReservation"] = lastMemoryReservation;
obj["createTimeMs"] = createTimeMs;
obj["firstSplitStartTimeMs"] = firstSplitStartTimeMs;
obj["lastEndTimeMs"] = lastEndTimeMs;

json j;
to_json(j, info);
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ struct PrestoTask {
uint64_t lastHeartbeatMs{0};
uint64_t lastTaskStatsUpdateMs = {0};
uint64_t lastMemoryReservation = {0};
uint64_t createTimeMs{0};
uint64_t firstSplitStartTimeMs{0};
uint64_t lastEndTimeMs{0};
mutable std::mutex mutex;

/// Error before task is created or when task is being created.
Expand Down
Loading
Loading