Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): Java UDF SDK #10095

Merged
merged 33 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3f455d0
add simple Java UDF SDK
wangrunji0408 May 30, 2023
ac62534
add simple client and unit test
wangrunji0408 May 30, 2023
52ee8bf
Merge remote-tracking branch 'origin/main' into wrj/java-sdk
wangrunji0408 Jun 1, 2023
eb0cb03
support returning struct
wangrunji0408 Jun 1, 2023
4e9fc7c
support table function with single return type
wangrunji0408 Jun 2, 2023
bb47887
Merge remote-tracking branch 'origin/main' into wrj/java-sdk
wangrunji0408 Jun 2, 2023
5a9f381
change the data type of row_index to int32
wangrunji0408 Jun 2, 2023
1487222
change example table function `series2` to `split`
wangrunji0408 Jun 2, 2023
a8f93b1
Merge remote-tracking branch 'origin/main' into wrj/java-sdk
wangrunji0408 Jun 6, 2023
0b0ffc2
change the return schema of table functions with multiple columns
wangrunji0408 Jun 8, 2023
4aaeb70
fix string arguments
wangrunji0408 Jun 8, 2023
6f02a45
add test for string arguments
wangrunji0408 Jun 8, 2023
422f3fc
support decimal input
wangrunji0408 Jun 8, 2023
454a57e
support VARCHAR[] input
wangrunji0408 Jun 8, 2023
ba66fe8
pass e2e tests for java udf
wangrunji0408 Jun 8, 2023
0aa6491
add ci for java udf
wangrunji0408 Jun 8, 2023
5389f16
change the jar name
wangrunji0408 Jun 8, 2023
684963f
add code docs
wangrunji0408 Jun 9, 2023
b6813c3
support JSONB type
wangrunji0408 Jun 9, 2023
0effbc6
pass all tests
wangrunji0408 Jun 9, 2023
7db2f43
revert ci image and rename python.slt
wangrunji0408 Jun 9, 2023
80e4d7c
fix doc test
wangrunji0408 Jun 9, 2023
2cd9f60
move to ./java folder
wangrunji0408 Jun 13, 2023
340625d
change abstract classes to interfaces
wangrunji0408 Jun 13, 2023
e3adcc2
split classes into multiple files
wangrunji0408 Jun 13, 2023
7ca1552
move client to test dir
wangrunji0408 Jun 13, 2023
72ea754
use camelCase
wangrunji0408 Jun 13, 2023
1f27060
use MethodHandle for invocation
wangrunji0408 Jun 13, 2023
66a47e6
add license header
wangrunji0408 Jun 13, 2023
6c690d1
add test and fix for table function
wangrunji0408 Jun 13, 2023
7ac12e6
change TableFunction API to returning Iterator
wangrunji0408 Jun 13, 2023
212ab33
Merge branch 'main' into wrj/java-sdk
wangrunji0408 Jun 13, 2023
c8964ce
requires non-static for eval methods
wangrunji0408 Jun 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions ci/scripts/build-other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ set -euo pipefail
source ci/scripts/common.sh


echo "--- Build Java connector node"
echo "--- Build Java packages"
cd java

mvn -B package -Dmaven.test.skip=true
cd ..

echo "--- Upload Java artifacts"
cp connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz ./risingwave-connector.tar.gz
cp java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz ./risingwave-connector.tar.gz
cp java/udf/target/risingwave-udf-example.jar ./risingwave-udf-example.jar
buildkite-agent artifact upload ./risingwave-connector.tar.gz
buildkite-agent artifact upload ./risingwave-udf-example.jar
11 changes: 9 additions & 2 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ download_and_prepare_rw "$profile" common
echo "--- Download artifacts"
download-and-decompress-artifact e2e_test_generated ./
download-and-decompress-artifact risingwave_e2e_extended_mode_test-"$profile" target/debug/
buildkite-agent artifact download risingwave-udf-example.jar ./
mv target/debug/risingwave_e2e_extended_mode_test-"$profile" target/debug/risingwave_e2e_extended_mode_test

chmod +x ./target/debug/risingwave_e2e_extended_mode_test
Expand All @@ -46,12 +47,18 @@ sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
sqllogictest -p 4566 -d test './e2e_test/database/test.slt'

echo "--- e2e, ci-3streaming-2serving-3fe, udf"
echo "--- e2e, ci-3streaming-2serving-3fe, python udf"
python3 e2e_test/udf/test.py &
sleep 2
sqllogictest -p 4566 -d dev './e2e_test/udf/python.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/udf.slt'
pkill python3

echo "--- e2e, ci-3streaming-2serving-3fe, java udf"
java -jar risingwave-udf-example.jar &
sleep 2
sqllogictest -p 4566 -d dev './e2e_test/udf/udf.slt'
pkill java

echo "--- Kill cluster"
cargo make ci-kill

Expand Down
10 changes: 5 additions & 5 deletions e2e_test/udf/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def series(n: int) -> Iterator[int]:
yield i


@udtf(input_types="INT", result_types=["INT", "VARCHAR"])
def series2(n: int) -> Iterator[Tuple[int, str]]:
for i in range(n):
yield i, f"#{i}"
@udtf(input_types="VARCHAR", result_types=["VARCHAR", "INT"])
def split(string: str) -> Iterator[Tuple[str, int]]:
for s in string.split(" "):
yield s, len(s)


@udf(input_types="VARCHAR", result_type="DECIMAL")
Expand Down Expand Up @@ -100,7 +100,7 @@ def jsonb_array_struct_identity(v: Tuple[List[Any], int]) -> Tuple[List[Any], in
server.add_function(gcd)
server.add_function(gcd3)
server.add_function(series)
server.add_function(series2)
server.add_function(split)
server.add_function(extract_tcp_info)
server.add_function(hex_to_dec)
server.add_function(array_access)
Expand Down
61 changes: 39 additions & 22 deletions e2e_test/udf/python.slt → e2e_test/udf/udf.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Before running this test:
# python3 e2e_test/udf/test.py
# or:
# cd src/udf/java && mvn package && java -jar target/risingwave-udf-example.jar

# Create a function.
statement ok
Expand Down Expand Up @@ -35,7 +37,7 @@ create function series(int) returns table (x int) language python as series usin

# Create a table function that returns multiple columns.
statement ok
create function series2(int) returns table (x int, y varchar) language python as series2 using link 'http://localhost:8815';
create function split(varchar) returns table (word varchar, length int) language python as split using link 'http://localhost:8815';

statement ok
create function hex_to_dec(varchar) returns decimal language python as hex_to_dec using link 'http://localhost:8815';
Expand Down Expand Up @@ -70,7 +72,7 @@ jsonb_array_identity jsonb[] jsonb[] python http://localhost:8815
jsonb_array_struct_identity struct<v jsonb[],len integer> struct<v jsonb[],len integer> python http://localhost:8815
jsonb_concat jsonb[] jsonb python http://localhost:8815
series integer integer python http://localhost:8815
series2 integer struct<x integer,y varchar> python http://localhost:8815
split varchar struct<word varchar,length integer> python http://localhost:8815

query I
select int_42();
Expand Down Expand Up @@ -111,12 +113,12 @@ select jsonb_concat(ARRAY['null'::jsonb, '1'::jsonb, '"str"'::jsonb, '{}'::jsonb
[null, 1, "str", {}]

query T
select jsonb_array_identity(ARRAY['null'::jsonb, '1'::jsonb, '"str"'::jsonb, '{}'::jsonb]);
select jsonb_array_identity(ARRAY[null, '1'::jsonb, '"str"'::jsonb, '{}'::jsonb]);
----
{NULL,1,"\"str\"","{}"}

query T
select jsonb_array_struct_identity(ROW(ARRAY['null'::jsonb, '1'::jsonb, '"str"'::jsonb, '{}'::jsonb], 4)::struct<v jsonb[], len int>);
select jsonb_array_struct_identity(ROW(ARRAY[null, '1'::jsonb, '"str"'::jsonb, '{}'::jsonb], 4)::struct<v jsonb[], len int>);
----
({NULL,1,"\"str\"","{}"},4)

Expand All @@ -130,18 +132,16 @@ select series(5);
4

query IT
select * from series2(3);
select * from split('rising wave');
----
0 #0
1 #1
2 #2
rising 6
wave 4

query T
select series2(3);
select split('rising wave');
----
(0,#0)
(1,#1)
(2,#2)
(rising,6)
(wave,4)

query II
select x, series(x) from series(4) t(x);
Expand All @@ -153,16 +153,6 @@ select x, series(x) from series(4) t(x);
3 1
3 2

query IT
select x, series2(x) from series(4) t(x);
----
1 (0,#0)
2 (0,#0)
2 (1,#1)
3 (0,#0)
3 (1,#1)
3 (2,#2)

# test large output for table function
query I
select count(*) from series(1000000);
Expand Down Expand Up @@ -229,3 +219,30 @@ drop function gcd();
# Drop a function without arguments. Now the function name is unique.
statement ok
drop function gcd;

statement ok
drop function extract_tcp_info;

statement ok
drop function series;

statement ok
drop function split;

statement ok
drop function hex_to_dec;

statement ok
drop function array_access;

statement ok
drop function jsonb_access;

statement ok
drop function jsonb_concat;

statement ok
drop function jsonb_array_identity;

statement ok
drop function jsonb_array_struct_identity;
1 change: 1 addition & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<version>1.0-SNAPSHOT</version>
<modules>
<module>proto</module>
<module>udf</module>
<module>java-binding</module>
<module>common-utils</module>
<module>java-binding-integration-test</module>
Expand Down
8 changes: 8 additions & 0 deletions java/udf/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## How to run example

Make sure you have installed Java 11 and Maven 3 or later.

```sh
mvn package
java -jar target/risingwave-udf-example.jar
```
95 changes: 95 additions & 0 deletions java/udf/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-udf</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>risingwave-udf</name>
<url>http://maven.apache.org</url>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.9.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>12.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-core</artifactId>
<version>12.0.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M7</version>
<configuration>
<argLine>--add-opens=java.base/java.nio=ALL-UNNAMED</argLine>
</configuration>
</plugin>
<!-- generate an executable for examples -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.4.2</version>
<configuration>
<archive>
<manifest>
<mainClass>com.risingwave.functions.example.UdfExample</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>risingwave-udf-example</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>udf-example</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.risingwave.functions;
wangrunji0408 marked this conversation as resolved.
Show resolved Hide resolved

import java.lang.annotation.*;

@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER })
public @interface DataTypeHint {
String value();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.risingwave.functions;
wangrunji0408 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Base interface for a user-defined scalar function. A user-defined scalar function
* maps zero, one, or multiple scalar values to a new scalar value.
*
* <p>
* The behavior of a {@link ScalarFunction} can be defined by implementing a
* custom evaluation method. An evaluation method must be declared publicly and
* named <code>eval</code>. Multiple overloaded methods named <code>eval</code>
* are not supported yet.
*
* <p>
* By default, input and output data types are automatically extracted using
* reflection.
*
* <p>
* The following examples show how to specify a scalar function:
*
* <pre>
* {@code
* // a function that accepts two INT arguments and computes a sum
* class SumFunction implements ScalarFunction {
* public Integer eval(Integer a, Integer b) {
* return a + b;
* }
* }
*
* // a function that returns a struct type
* class StructFunction implements ScalarFunction {
* public static class KeyValue {
* public String key;
* public int value;
* }
*
* public KeyValue eval(int a) {
* KeyValue kv = new KeyValue();
* kv.key = a.toString();
* kv.value = a;
* return kv;
* }
* }
* }</pre>
*/
public abstract interface ScalarFunction extends UserDefinedFunction {
wangrunji0408 marked this conversation as resolved.
Show resolved Hide resolved
}
Loading