Skip to content

Commit

Permalink
[native] Add e2e native function validation tests with Presto sidecar
Browse files Browse the repository at this point in the history
Co-authored-by: Tim Meehan <[email protected]>
  • Loading branch information
2 people authored and Pratik Joseph Dabre committed Sep 27, 2024
1 parent e3d2ccf commit 6421091
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static void main(String[] args)
javaQueryRunner.close();

// Launch distributed runner.
DistributedQueryRunner queryRunner = (DistributedQueryRunner) PrestoNativeQueryRunnerUtils.createQueryRunner(false);
DistributedQueryRunner queryRunner = (DistributedQueryRunner) PrestoNativeQueryRunnerUtils.createQueryRunner(false, false);
Thread.sleep(10);
Logger log = Logger.get(DistributedQueryRunner.class);
log.info("======== SERVER STARTED ========");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ public static Map<String, String> getNativeWorkerSystemProperties()
.build();
}

public static Map<String, String> getNativeSidecarProperties()
{
return ImmutableMap.<String, String>builder()
.put("coordinator-sidecar-enabled", "true")
.put("list-built-in-functions-only", "false")
.put("presto.default-namespace", "native.default")
.build();
}

/**
* Creates all tables for local testing, except for bench tables.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static com.facebook.presto.hive.HiveTestUtils.getProperty;
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeSidecarProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerHiveProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerIcebergProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerSystemProperties;
Expand All @@ -55,9 +56,10 @@ public class PrestoNativeQueryRunnerUtils

private static final Logger log = Logger.get(PrestoNativeQueryRunnerUtils.class);
private static final String DEFAULT_STORAGE_FORMAT = "DWRF";

private PrestoNativeQueryRunnerUtils() {}

public static QueryRunner createQueryRunner(boolean addStorageFormatToPath)
public static QueryRunner createQueryRunner(boolean addStorageFormatToPath, boolean isCoordinatorSidecarEnabled)
throws Exception
{
int cacheMaxSize = 4096; // 4GB size cache
Expand All @@ -68,7 +70,8 @@ public static QueryRunner createQueryRunner(boolean addStorageFormatToPath)
nativeQueryRunnerParameters.workerCount,
cacheMaxSize,
DEFAULT_STORAGE_FORMAT,
addStorageFormatToPath);
addStorageFormatToPath,
isCoordinatorSidecarEnabled);
}

public static QueryRunner createQueryRunner(
Expand All @@ -77,7 +80,8 @@ public static QueryRunner createQueryRunner(
Optional<Integer> workerCount,
int cacheMaxSize,
String storageFormat,
boolean addStorageFormatToPath)
boolean addStorageFormatToPath,
boolean isCoordinatorSidecarEnabled)
throws Exception
{
QueryRunner defaultQueryRunner = createJavaQueryRunner(dataDirectory, storageFormat, addStorageFormatToPath);
Expand All @@ -88,7 +92,7 @@ public static QueryRunner createQueryRunner(

defaultQueryRunner.close();

return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false);
return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled);
}

public static QueryRunner createJavaQueryRunner()
Expand Down Expand Up @@ -251,7 +255,7 @@ public static QueryRunner createNativeIcebergQueryRunner(
false,
false,
OptionalInt.of(workerCount.orElse(4)),
getExternalWorkerLauncher("iceberg", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, false),
getExternalWorkerLauncher("iceberg", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, false, false),
dataDirectory,
addStorageFormatToPath);
}
Expand All @@ -265,7 +269,8 @@ public static QueryRunner createNativeQueryRunner(
Optional<String> remoteFunctionServerUds,
String storageFormat,
boolean addStorageFormatToPath,
Boolean failOnNestedLoopJoin)
Boolean failOnNestedLoopJoin,
boolean isCoordinatorSidecarEnabled)
throws Exception
{
// The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner.
Expand All @@ -282,13 +287,14 @@ public static QueryRunner createNativeQueryRunner(
.put("http-server.http.port", "8081")
.put("experimental.internal-communication.thrift-transport-enabled", String.valueOf(useThrift))
.putAll(getNativeWorkerSystemProperties())
.putAll(isCoordinatorSidecarEnabled ? getNativeSidecarProperties() : ImmutableMap.of())
.build(),
ImmutableMap.of(),
"legacy",
hiveProperties,
workerCount,
Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)),
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin));
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin, isCoordinatorSidecarEnabled));
}

public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String storageFormat)
Expand Down Expand Up @@ -331,13 +337,13 @@ public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String s
hiveProperties,
workerCount,
Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)),
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty(), false));
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty(), false, false));
}

public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds)
throws Exception
{
return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false);
return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false);
}

public static QueryRunner createNativeQueryRunner(boolean useThrift)
Expand All @@ -349,16 +355,16 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift)
public static QueryRunner createNativeQueryRunner(boolean useThrift, boolean failOnNestedLoopJoin)
throws Exception
{
return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin);
return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false);
}

public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat)
throws Exception
{
return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false);
return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false);
}

public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional<String> remoteFunctionServerUds, Boolean failOnNestedLoopJoin)
public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional<String> remoteFunctionServerUds, Boolean failOnNestedLoopJoin, boolean isCoordinatorSidecarEnabled)
throws Exception
{
int cacheMaxSize = 0;
Expand All @@ -372,7 +378,8 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift, String stor
remoteFunctionServerUds,
storageFormat,
true,
failOnNestedLoopJoin);
failOnNestedLoopJoin,
isCoordinatorSidecarEnabled);
}

// Start the remote function server. Return the UDS path used to communicate with it.
Expand All @@ -399,10 +406,10 @@ public static String startRemoteFunctionServer(String remoteFunctionServerBinary
public static NativeQueryRunnerParameters getNativeQueryRunnerParameters()
{
Path prestoServerPath = Paths.get(getProperty("PRESTO_SERVER")
.orElse("_build/debug/presto_cpp/main/presto_server"))
.orElse("_build/debug/presto_cpp/main/presto_server"))
.toAbsolutePath();
Path dataDirectory = Paths.get(getProperty("DATA_DIR")
.orElse("target/velox_data"))
.orElse("target/velox_data"))
.toAbsolutePath();
Optional<Integer> workerCount = getProperty("WORKER_COUNT").map(Integer::parseInt);

Expand All @@ -419,7 +426,7 @@ public static NativeQueryRunnerParameters getNativeQueryRunnerParameters()
return new NativeQueryRunnerParameters(prestoServerPath, dataDirectory, workerCount);
}

public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLauncher(String catalogName, String prestoServerPath, int cacheMaxSize, Optional<String> remoteFunctionServerUds, Boolean failOnNestedLoopJoin)
public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLauncher(String catalogName, String prestoServerPath, int cacheMaxSize, Optional<String> remoteFunctionServerUds, Boolean failOnNestedLoopJoin, boolean isCoordinatorSidecarEnabled)
{
return
Optional.of((workerIndex, discoveryUri) -> {
Expand All @@ -436,6 +443,12 @@ public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLaunc
"system-memory-gb=4%n" +
"http-server.http.port=%d", discoveryUri, port);

if (isCoordinatorSidecarEnabled) {
configProperties = format("%s%n" +
"native-sidecar=true%n" +
"presto.default-namespace=native.default%n", configProperties);
}

if (remoteFunctionServerUds.isPresent()) {
String jsonSignaturesPath = Resources.getResource(REMOTE_FUNCTION_JSON_SIGNATURES).getFile();
configProperties = format("%s%n" +
Expand Down
97 changes: 97 additions & 0 deletions presto-native-sidecar-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>log-manager</artifactId>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand Down Expand Up @@ -125,5 +130,97 @@
<artifactId>http-client</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tests</artifactId>
<scope>test</scope>
<type>test-jar</type>
<version>0.290-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tests</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-native-execution</artifactId>
<scope>test</scope>
<type>test-jar</type>
<version>0.290-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tpcds</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>jaxrs</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-client</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>jaxrs-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- TODO: this is needed here because of the dependecny on presto-native-execution -->
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredResourcePatterns>
<ignoredResourcePattern>parquet.thrift</ignoredResourcePattern>
<ignoredResourcePattern>about.html</ignoredResourcePattern>
<ignoredResourcePattern>mozilla/public-suffix-list.txt</ignoredResourcePattern>
<ignoredResourcePattern>iceberg-build.properties</ignoredResourcePattern>
<ignoredResourcePattern>org.apache.avro.data/Json.avsc</ignoredResourcePattern>
</ignoredResourcePatterns>
<ignoredClassPatterns>
<ignoredClassPattern>com.esotericsoftware.kryo.*</ignoredClassPattern>
<ignoredClassPattern>com.esotericsoftware.minlog.Log</ignoredClassPattern>
<ignoredClassPattern>com.esotericsoftware.reflectasm.*</ignoredClassPattern>
<ignoredClassPattern>module-info</ignoredClassPattern>
<ignoredClassPattern>META-INF.versions.9.module-info</ignoredClassPattern>
<ignoredClassPattern>org.apache.avro.*</ignoredClassPattern>
<ignoredClassPattern>com.github.benmanes.caffeine.*</ignoredClassPattern>
<ignoredClassPattern>org.roaringbitmap.*</ignoredClassPattern>
</ignoredClassPatterns>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sidecar;

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.log.Logging;
import com.facebook.presto.nativeworker.NativeQueryRunnerUtils;
import com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.DistributedQueryRunner;

import static com.facebook.presto.sidecar.NativeSidecarPluginQueryRunnerUtils.setupNativeSidecarPlugin;

public class NativeSidecarPluginQueryRunner
{
private NativeSidecarPluginQueryRunner() {}

public static void main(String[] args)
throws Exception
{
// You need to add "--user user" to your CLI for your queries to work.
Logging.initialize();

// Create tables before launching distributed runner.
QueryRunner javaQueryRunner = PrestoNativeQueryRunnerUtils.createJavaQueryRunner(false);
NativeQueryRunnerUtils.createAllTables(javaQueryRunner);
javaQueryRunner.close();

// Launch distributed runner.
DistributedQueryRunner queryRunner = (DistributedQueryRunner) PrestoNativeQueryRunnerUtils.createQueryRunner(false, true);
setupNativeSidecarPlugin(queryRunner);
Thread.sleep(10);
Logger log = Logger.get(DistributedQueryRunner.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sidecar;
import com.facebook.presto.sidecar.functionNamespace.NativeFunctionNamespaceManagerFactory;
import com.facebook.presto.testing.QueryRunner;
import com.google.common.collect.ImmutableMap;

public class NativeSidecarPluginQueryRunnerUtils
{
private NativeSidecarPluginQueryRunnerUtils() {};

public static void setupNativeSidecarPlugin(QueryRunner queryRunner)
{
queryRunner.installCoordinatorPlugin(new NativeSidecarPlugin());
queryRunner.loadNativeFunctionNamespaceManager(
NativeFunctionNamespaceManagerFactory.NAME,
"native",
ImmutableMap.of(
"supported-function-languages", "CPP",
"function-implementation-type", "CPP"));
}
}
Loading

0 comments on commit 6421091

Please sign in to comment.