Skip to content

Commit

Permalink
[native] SystemConnector to query system.runtime.tasks table
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-pandit committed Mar 22, 2024
1 parent e254c6c commit a79a770
Show file tree
Hide file tree
Showing 11 changed files with 13,128 additions and 12,208 deletions.
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ add_library(
QueryContextManager.cpp
ServerOperation.cpp
SignalHandler.cpp
SystemConnector.cpp
TaskManager.cpp
TaskResource.cpp
PeriodicHeartbeatManager.cpp
Expand Down
22 changes: 22 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "presto_cpp/main/Announcer.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/SystemConnector.h"
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
Expand Down Expand Up @@ -242,6 +243,12 @@ void PrestoServer::run() {
std::make_unique<IcebergPrestoToVeloxConnector>("iceberg"));
registerPrestoToVeloxConnector(
std::make_unique<TpchPrestoToVeloxConnector>("tpch"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

initializeVeloxMemory();
initializeThreadPools();
Expand Down Expand Up @@ -448,6 +455,7 @@ void PrestoServer::run() {
}
prestoServerOperations_ =
std::make_unique<PrestoServerOperations>(taskManager_.get(), this);
registerSystemConnector();

// The endpoint used by operation in production.
httpServer_->registerGet(
Expand Down Expand Up @@ -946,9 +954,23 @@ std::vector<std::string> PrestoServer::registerConnectors(
velox::connector::registerConnector(connector);
}
}

return catalogNames;
}

void PrestoServer::registerSystemConnector() {
PRESTO_STARTUP_LOG(INFO) << "Registering system catalog "
<< " using connector SystemConnector";
auto systemConnector = std::dynamic_pointer_cast<SystemConnector>(
facebook::velox::connector::getConnectorFactory("$system")->newConnector(
"$system@system",
std::move(std::make_shared<const velox::core::MemConfig>()),
connectorIoExecutor_.get()));
VELOX_CHECK(systemConnector);
systemConnector->setTaskManager(taskManager_.get());
velox::connector::registerConnector(systemConnector);
}

void PrestoServer::unregisterConnectors() {
PRESTO_SHUTDOWN_LOG(INFO) << "Unregistering connectors";
auto connectors = facebook::velox::connector::getAllConnectors();
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ class PrestoServer {
// Periodically yield tasks if there are tasks queued.
void yieldTasks();

void registerSystemConnector();

const std::string configDirectoryPath_;

std::shared_ptr<CoordinatorDiscoverer> coordinatorDiscoverer_;
Expand Down
297 changes: 297 additions & 0 deletions presto-native-execution/presto_cpp/main/SystemConnector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/SystemConnector.h"
#include "presto_cpp/main/PrestoTask.h"
#include "presto_cpp/main/TaskManager.h"

namespace facebook::presto {

using namespace velox;

SystemTableHandle::SystemTableHandle(
std::string connectorId,
std::string schemaName,
std::string tableName)
: ConnectorTableHandle(std::move(connectorId)),
schemaName_(std::move(schemaName)),
tableName_(std::move(tableName)) {
VELOX_USER_CHECK_EQ(
schemaName_, "runtime", "SystemConnector supports only runtime schema");
VELOX_USER_CHECK_EQ(
tableName_, "tasks", "SystemConnector supports only tasks table");

std::vector<std::string> kTaskColumnNames = {
"node_id",
"task_id",
"stage_execution_id",
"stage_id",
"query_id",
"state",
"splits",
"queued_splits",
"running_splits",
"completed_splits",
"split_scheduled_time_ms",
"split_cpu_time_ms",
"split_blocked_time_ms",
"raw_input_bytes",
"raw_input_rows",
"processed_input_bytes",
"processed_input_rows",
"output_bytes",
"output_rows",
"physical_written_bytes",
"created",
"start",
"last_heartbeat",
"end"};
std::vector<velox::TypePtr> kTaskColumnTypes = {
velox::VARCHAR(), velox::VARCHAR(), velox::VARCHAR(),
velox::VARCHAR(), velox::VARCHAR(), velox::VARCHAR(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::BIGINT(),
velox::BIGINT(), velox::BIGINT(), velox::TIMESTAMP(),
velox::TIMESTAMP(), velox::TIMESTAMP(), velox::TIMESTAMP()};
kTaskSchema_ = ROW(std::move(kTaskColumnNames), std::move(kTaskColumnTypes));
}

std::string SystemTableHandle::toString() const {
return fmt::format("schema: {} table: {}", schemaName_, tableName_);
}

SystemDataSource::SystemDataSource(
const std::shared_ptr<const RowType>& outputType,
const std::shared_ptr<connector::ConnectorTableHandle>& tableHandle,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
const TaskManager* taskManager,
velox::memory::MemoryPool* FOLLY_NONNULL pool)
: taskManager_(taskManager), pool_(pool) {
auto systemTableHandle =
std::dynamic_pointer_cast<SystemTableHandle>(tableHandle);
VELOX_CHECK_NOT_NULL(
systemTableHandle,
"TableHandle must be an instance of SystemTableHandle");
VELOX_USER_CHECK_EQ(
systemTableHandle->schemaName(),
"runtime",
"SystemConnector supports only runtime schema");
VELOX_USER_CHECK_EQ(
systemTableHandle->tableName(),
"tasks",
"SystemConnector supports only tasks table");

taskTableHandle_ = systemTableHandle;
outputColumnMappings_.reserve(outputType->names().size());
auto taskSchema = taskTableHandle_->taskSchema();
for (const auto& outputName : outputType->names()) {
auto it = columnHandles.find(outputName);
VELOX_CHECK(
it != columnHandles.end(),
"ColumnHandle is missing for output column '{}'",
outputName);

auto handle = std::dynamic_pointer_cast<SystemColumnHandle>(it->second);
VELOX_CHECK_NOT_NULL(
handle,
"ColumnHandle must be an instance of SystemColumnHandle "
"for '{}' on table '{}'",
handle->name());

auto columnIndex = taskSchema->getChildIdxIfExists(handle->name());
VELOX_CHECK(columnIndex.has_value());
outputColumnMappings_.push_back(columnIndex.value());
}

outputType_ = outputType;
taskTableResult_ = std::dynamic_pointer_cast<RowVector>(
BaseVector::create(taskSchema, 0, pool_));
}

void SystemDataSource::addSplit(
std::shared_ptr<connector::ConnectorSplit> split) {
VELOX_CHECK_NULL(
currentSplit_,
"Previous split has not been processed yet. Call next() to process the split.");
currentSplit_ = std::dynamic_pointer_cast<SystemSplit>(split);
VELOX_CHECK(currentSplit_, "Wrong type of split for SystemDataSource.");
VELOX_USER_CHECK_EQ(
currentSplit_->schemaName(),
"runtime",
"SystemConnector supports only runtime schema");
VELOX_USER_CHECK_EQ(
currentSplit_->tableName(),
"tasks",
"SystemConnector supports only tasks table");
}

namespace {
void getTaskResults(
const TaskMap& taskMap,
const RowVectorPtr& taskTableResult) {
auto numRows = taskMap.size();
taskTableResult->resize(numRows);
for (auto i = 0; i < taskTableResult->type()->size(); i++) {
taskTableResult->childAt(i)->resize(numRows);
}

auto toMillis = [](int64_t nanos) -> int64_t { return nanos / 1'000'000; };
int i = 0;
for (const auto& taskEntry : taskMap) {
auto task = taskEntry.second;
auto taskInfo = task->updateInfo();
taskTableResult->childAt(0)->asFlatVector<StringView>()->set(
i, StringView(taskInfo.nodeId));
taskTableResult->childAt(1)->asFlatVector<StringView>()->set(
i, StringView(taskInfo.taskId));
std::string temp = fmt::format("{}", task->id.stageExecutionId());
taskTableResult->childAt(2)->asFlatVector<StringView>()->set(
i, StringView(temp));
temp = fmt::format("{}", task->id.stageId());
taskTableResult->childAt(3)->asFlatVector<StringView>()->set(
i, StringView(temp));
temp = fmt::format("{}", task->id.queryId());
taskTableResult->childAt(4)->asFlatVector<StringView>()->set(
i, StringView(temp));
temp = json(taskInfo.taskStatus.state).dump();
taskTableResult->childAt(5)->asFlatVector<StringView>()->set(
i, StringView(temp));
taskTableResult->childAt(6)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.totalDrivers);
taskTableResult->childAt(7)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.queuedDrivers);
taskTableResult->childAt(8)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.runningDrivers);
taskTableResult->childAt(9)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.completedDrivers);
taskTableResult->childAt(10)->asFlatVector<int64_t>()->set(
i, toMillis(taskInfo.stats.totalScheduledTimeInNanos));
taskTableResult->childAt(11)->asFlatVector<int64_t>()->set(
i, toMillis(taskInfo.stats.totalCpuTimeInNanos));
taskTableResult->childAt(12)->asFlatVector<int64_t>()->set(
i, toMillis(taskInfo.stats.totalBlockedTimeInNanos));
taskTableResult->childAt(13)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.rawInputDataSizeInBytes);
taskTableResult->childAt(14)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.rawInputPositions);
taskTableResult->childAt(15)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.processedInputDataSizeInBytes);
taskTableResult->childAt(16)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.processedInputPositions);
taskTableResult->childAt(17)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.outputDataSizeInBytes);
taskTableResult->childAt(18)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.outputPositions);
taskTableResult->childAt(19)->asFlatVector<int64_t>()->set(
i, taskInfo.stats.physicalWrittenDataSizeInBytes);
// Setting empty TIMESTAMP for below fields as string -> Timestamp is not
// implemented in Velox.
// taskTableResult->childAt(20)->asFlatVector<Timestamp>()->set(i,
// Timestamp(taskInfo.stats.createTime));
taskTableResult->childAt(20)->asFlatVector<Timestamp>()->set(
i, Timestamp());
// taskTableResult->childAt(21)->asFlatVector<Timestamp>()->set(i,
// Timestamp(taskInfo.stats.firstStartTime));
taskTableResult->childAt(21)->asFlatVector<Timestamp>()->set(
i, Timestamp());
// taskTableResult->childAt(22)->asFlatVector<Timestamp>()->set(i,
// Timestamp(taskInfo.lastHeartbeat));
taskTableResult->childAt(22)->asFlatVector<Timestamp>()->set(
i, Timestamp());
// taskTableResult->childAt(23)->asFlatVector<Timestamp>()->set(i,
// Timestamp(taskInfo.stats.endTime));
taskTableResult->childAt(23)->asFlatVector<Timestamp>()->set(
i, Timestamp());

i++;
}
}

} // namespace

std::optional<RowVectorPtr> SystemDataSource::next(
uint64_t size,
velox::ContinueFuture& /*future*/) {
if (!currentSplit_) {
return nullptr;
}

TaskMap taskMap = taskManager_->tasks();
auto numRows = taskMap.size();
getTaskResults(taskMap, taskTableResult_);

auto result = std::dynamic_pointer_cast<RowVector>(
BaseVector::create(outputType_, numRows, pool_));

for (auto i = 0; i < outputColumnMappings_.size(); i++) {
result->childAt(i) = taskTableResult_->childAt(outputColumnMappings_.at(i));
}

currentSplit_ = nullptr;
return result;
}

VELOX_REGISTER_CONNECTOR_FACTORY(std::make_shared<SystemConnectorFactory>())

std::unique_ptr<velox::connector::ConnectorSplit>
SystemPrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* const connectorSplit) const {
auto systemSplit = dynamic_cast<const protocol::SystemSplit*>(connectorSplit);
VELOX_CHECK_NOT_NULL(
systemSplit, "Unexpected split type {}", connectorSplit->_type);
return std::make_unique<SystemSplit>(
catalogId,
systemSplit->tableHandle.schemaName,
systemSplit->tableHandle.tableName);
}

std::unique_ptr<velox::connector::ColumnHandle>
SystemPrestoToVeloxConnector::toVeloxColumnHandle(
const protocol::ColumnHandle* column,
const TypeParser& typeParser) const {
auto systemColumn = dynamic_cast<const protocol::SystemColumnHandle*>(column);
VELOX_CHECK_NOT_NULL(
systemColumn, "Unexpected column handle type {}", column->_type);
return std::make_unique<SystemColumnHandle>(systemColumn->columnName);
}

std::unique_ptr<velox::connector::ConnectorTableHandle>
SystemPrestoToVeloxConnector::toVeloxTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
std::unordered_map<
std::string,
std::shared_ptr<velox::connector::ColumnHandle>>& assignments) const {
auto systemLayout =
std::dynamic_pointer_cast<const protocol::SystemTableLayoutHandle>(
tableHandle.connectorTableLayout);
VELOX_CHECK_NOT_NULL(
systemLayout, "Unexpected table handle type {}", tableHandle.connectorId);
return std::make_unique<SystemTableHandle>(
tableHandle.connectorId,
systemLayout->table.schemaName,
systemLayout->table.tableName);
}

std::unique_ptr<protocol::ConnectorProtocol>
SystemPrestoToVeloxConnector::createConnectorProtocol() const {
return std::make_unique<protocol::SystemConnectorProtocol>();
}

} // namespace facebook::presto
Loading

0 comments on commit a79a770

Please sign in to comment.