Skip to content

Commit

Permalink
[native] Retrieve Json function metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Sep 30, 2024
1 parent b05a9c0 commit d716ca2
Show file tree
Hide file tree
Showing 23 changed files with 1,697 additions and 71 deletions.
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ target_link_libraries(
$<TARGET_OBJECTS:presto_protocol>
presto_common
presto_exception
presto_function_metadata
presto_http
presto_operators
velox_aggregates
Expand Down
32 changes: 22 additions & 10 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "presto_cpp/main/types/FunctionMetadata.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
Expand Down Expand Up @@ -469,16 +470,7 @@ void PrestoServer::run() {
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());

if (systemConfig->prestoNativeSidecar()) {
httpServer_->registerGet(
"/v1/properties/session",
[this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
auto sessionProperties =
taskManager_->getQueryContextManager()->getSessionProperties();
http::sendOkResponse(downstream, sessionProperties.serialize());
});
registerSidecarEndpoints();
}

std::string taskUri;
Expand Down Expand Up @@ -1404,6 +1396,26 @@ void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
}

void PrestoServer::registerSidecarEndpoints() {
httpServer_->registerGet(
"/v1/properties/session",
[this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
auto sessionProperties =
taskManager_->getQueryContextManager()->getSessionProperties();
http::sendOkResponse(downstream, sessionProperties.serialize());
});
httpServer_->registerGet(
"/v1/functions",
[](proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, getFunctionsMetadata());
});
}

protocol::NodeStatus PrestoServer::fetchNodeStatus() {
auto systemConfig = SystemConfig::instance();
const int64_t nodeMemoryGb = systemConfig->systemMemoryGb();
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ class PrestoServer {

void registerSystemConnector();

void registerSidecarEndpoints();

std::unique_ptr<velox::cache::SsdCache> setupSsdCache();

const std::string configDirectoryPath_;
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ target_link_libraries(presto_types presto_type_converter velox_type_fbhive

set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool)

add_library(presto_function_metadata OBJECT FunctionMetadata.cpp)

target_link_libraries(presto_function_metadata velox_function_registry)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
289 changes: 289 additions & 0 deletions presto-native-execution/presto_cpp/main/types/FunctionMetadata.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/types/FunctionMetadata.h"
#include "presto_cpp/presto_protocol/presto_protocol.h"
#include "velox/exec/Aggregate.h"
#include "velox/exec/AggregateFunctionRegistry.h"
#include "velox/exec/WindowFunction.h"
#include "velox/expression/SimpleFunctionRegistry.h"
#include "velox/functions/FunctionRegistry.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace facebook::presto {

namespace {

// Check if the Velox type is supported in Presto.
bool isValidPrestoType(const TypeSignature& typeSignature) {
if (typeSignature.parameters().empty()) {
// Hugeint type is not supported in Presto.
auto kindName = boost::algorithm::to_upper_copy(typeSignature.baseName());
if (auto typeKind = tryMapNameToTypeKind(kindName)) {
return typeKind.value() != TypeKind::HUGEINT;
}
} else {
for (const auto& paramType : typeSignature.parameters()) {
if (!isValidPrestoType(paramType)) {
return false;
}
}
}
return true;
}

// The keys in velox function maps are of the format
// `catalog.schema.function_name`. This utility function extracts the
// three parts, {catalog, schema, function_name}, from the registered function.
const std::vector<std::string> getFunctionNameParts(
const std::string& registeredFunction) {
std::vector<std::string> parts;
folly::split('.', registeredFunction, parts, true);
VELOX_USER_CHECK(
parts.size() == 3,
fmt::format("Prefix missing for function {}", registeredFunction));
return parts;
}

// TODO: Remove this function later and retrieve companion function information
// from velox. Approaches for this under discussion here:
// https://github.com/facebookincubator/velox/discussions/11011.
// A function name is a companion function's if the name is an existing
// aggregation function name followed by specific suffixes.
bool isCompanionFunctionName(
const std::string& name,
const std::unordered_map<std::string, exec::AggregateFunctionEntry>&
aggregateFunctions) {
auto suffixOffset = name.rfind("_partial");
if (suffixOffset == std::string::npos) {
suffixOffset = name.rfind("_merge_extract");
}
if (suffixOffset == std::string::npos) {
suffixOffset = name.rfind("_merge");
}
if (suffixOffset == std::string::npos) {
suffixOffset = name.rfind("_extract");
}
if (suffixOffset == std::string::npos) {
return false;
}
return aggregateFunctions.count(name.substr(0, suffixOffset)) > 0;
}

const protocol::AggregationFunctionMetadata getAggregationFunctionMetadata(
const std::string& name,
const AggregateFunctionSignature& signature) {
protocol::AggregationFunctionMetadata metadata;
metadata.intermediateType = signature.intermediateType().toString();
metadata.isOrderSensitive =
getAggregateFunctionEntry(name)->metadata.orderSensitive;
return metadata;
}

const exec::VectorFunctionMetadata getScalarMetadata(const std::string& name) {
auto simpleFunctionMetadata =
exec::simpleFunctions().getFunctionSignaturesAndMetadata(name);
if (simpleFunctionMetadata.size()) {
// Functions like abs are registered as simple functions for primitive
// types, and as a vector function for complex types like DECIMAL. So do not
// throw an error if function metadata is not found in simple function
// signature map.
return simpleFunctionMetadata.back().first;
}

auto vectorFunctionMetadata = exec::getVectorFunctionMetadata(name);
if (vectorFunctionMetadata.has_value()) {
return vectorFunctionMetadata.value();
}
VELOX_UNREACHABLE("Metadata for function {} not found", name);
}

const protocol::RoutineCharacteristics getRoutineCharacteristics(
const std::string& name,
const protocol::FunctionKind& kind) {
protocol::Determinism determinism;
protocol::NullCallClause nullCallClause;
if (kind == protocol::FunctionKind::SCALAR) {
auto metadata = getScalarMetadata(name);
determinism = metadata.deterministic
? protocol::Determinism::DETERMINISTIC
: protocol::Determinism::NOT_DETERMINISTIC;
nullCallClause = metadata.defaultNullBehavior
? protocol::NullCallClause::RETURNS_NULL_ON_NULL_INPUT
: protocol::NullCallClause::CALLED_ON_NULL_INPUT;
} else {
// Default metadata values of DETERMINISTIC and CALLED_ON_NULL_INPUT for
// non-scalar functions.
determinism = protocol::Determinism::DETERMINISTIC;
nullCallClause = protocol::NullCallClause::CALLED_ON_NULL_INPUT;
}

protocol::RoutineCharacteristics routineCharacteristics;
routineCharacteristics.language =
std::make_shared<protocol::Language>(protocol::Language({"CPP"}));
routineCharacteristics.determinism =
std::make_shared<protocol::Determinism>(determinism);
routineCharacteristics.nullCallClause =
std::make_shared<protocol::NullCallClause>(nullCallClause);
return routineCharacteristics;
}

std::optional<protocol::JsonBasedUdfFunctionMetadata> buildFunctionMetadata(
const std::string& name,
const std::string& schema,
const protocol::FunctionKind& kind,
const FunctionSignature& signature,
const AggregateFunctionSignaturePtr& aggregateSignature = nullptr) {
protocol::JsonBasedUdfFunctionMetadata metadata;
metadata.docString = name;
metadata.functionKind = kind;
if (!isValidPrestoType(signature.returnType())) {
return std::nullopt;
}
metadata.outputType = signature.returnType().toString();

const auto& argumentTypes = signature.argumentTypes();
std::vector<std::string> paramTypes(argumentTypes.size());
for (auto i = 0; i < argumentTypes.size(); i++) {
if (!isValidPrestoType(argumentTypes.at(i))) {
return std::nullopt;
}
paramTypes[i] = argumentTypes.at(i).toString();
}
metadata.paramTypes = paramTypes;
metadata.schema = schema;
metadata.routineCharacteristics = getRoutineCharacteristics(name, kind);

if (aggregateSignature) {
metadata.aggregateMetadata =
std::make_shared<protocol::AggregationFunctionMetadata>(
getAggregationFunctionMetadata(name, *aggregateSignature));
}
return metadata;
}

json buildScalarMetadata(
const std::string& name,
const std::string& schema,
const std::vector<const FunctionSignature*>& signatures) {
json j = json::array();
json tj;
for (const auto& signature : signatures) {
if (auto functionMetadata = buildFunctionMetadata(
name, schema, protocol::FunctionKind::SCALAR, *signature)) {
protocol::to_json(tj, functionMetadata.value());
j.push_back(tj);
}
}
return j;
}

json buildAggregateMetadata(
const std::string& name,
const std::string& schema,
const std::vector<AggregateFunctionSignaturePtr>& signatures) {
// All aggregate functions can be used as window functions.
VELOX_USER_CHECK(
getWindowFunctionSignatures(name).has_value(),
"Aggregate function {} not registered as a window function",
name);
const std::vector<protocol::FunctionKind> kinds = {
protocol::FunctionKind::AGGREGATE, protocol::FunctionKind::WINDOW};
json j = json::array();
json tj;
for (const auto& kind : kinds) {
for (const auto& signature : signatures) {
if (auto functionMetadata = buildFunctionMetadata(
name, schema, kind, *signature, signature)) {
protocol::to_json(tj, functionMetadata.value());
j.push_back(tj);
}
}
}
return j;
}

json buildWindowMetadata(
const std::string& name,
const std::string& schema,
const std::vector<FunctionSignaturePtr>& signatures) {
json j = json::array();
json tj;
for (const auto& signature : signatures) {
if (auto functionMetadata = buildFunctionMetadata(
name, schema, protocol::FunctionKind::WINDOW, *signature)) {
protocol::to_json(tj, functionMetadata.value());
j.push_back(tj);
}
}
return j;
}

} // namespace

json getFunctionsMetadata() {
json j;

// Get metadata for all registered scalar functions in velox.
const auto signatures = getFunctionSignatures();
static const std::unordered_set<std::string> kBlockList = {
"row_constructor", "in", "is_null"};
// Exclude aggregate companion functions (extract aggregate companion
// functions are registered as vector functions).
const auto aggregateFunctions = exec::aggregateFunctions().copy();
for (const auto& entry : signatures) {
const auto name = entry.first;
// Skip internal functions. They don't have any prefix.
if (kBlockList.count(name) != 0 ||
name.find("$internal$") != std::string::npos ||
isCompanionFunctionName(name, aggregateFunctions)) {
continue;
}

const auto parts = getFunctionNameParts(name);
const auto schema = parts[1];
const auto function = parts[2];
j[function] = buildScalarMetadata(name, schema, entry.second);
}

// Get metadata for all registered aggregate functions in velox.
for (const auto& entry : aggregateFunctions) {
if (!isCompanionFunctionName(entry.first, aggregateFunctions)) {
const auto name = entry.first;
const auto parts = getFunctionNameParts(name);
const auto schema = parts[1];
const auto function = parts[2];
j[function] =
buildAggregateMetadata(name, schema, entry.second.signatures);
}
}

// Get metadata for all registered window functions in velox. Skip aggregates
// as they have been processed.
const auto& functions = exec::windowFunctions();
for (const auto& entry : functions) {
if (aggregateFunctions.count(entry.first) == 0) {
const auto name = entry.first;
const auto parts = getFunctionNameParts(entry.first);
const auto schema = parts[1];
const auto function = parts[2];
j[function] = buildWindowMetadata(name, schema, entry.second.signatures);
}
}

return j;
}

} // namespace facebook::presto
24 changes: 24 additions & 0 deletions presto-native-execution/presto_cpp/main/types/FunctionMetadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "presto_cpp/external/json/nlohmann/json.hpp"

namespace facebook::presto {

// Returns metadata for all registered functions as json.
nlohmann::json getFunctionsMetadata();

} // namespace facebook::presto
Loading

0 comments on commit d716ca2

Please sign in to comment.