From 6b3edfab28e20e9a5a46c613c28975a0ca499e34 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 14 Jun 2023 01:19:34 +0800 Subject: [PATCH 1/3] move udf example to a new project --- ci/scripts/build-other.sh | 2 +- e2e_test/udf/udf.slt | 2 +- java/pom.xml | 1 + java/udf-example/pom.xml | 67 +++++++++++++++++++ .../main/java/com}/example/UdfExample.java | 9 +-- java/udf/pom.xml | 27 -------- .../risingwave/functions/TableFunction.java | 2 +- .../risingwave/functions/TestUdfServer.java | 36 ++++++++-- .../com/risingwave/functions/UdfClient.java | 3 - 9 files changed, 102 insertions(+), 47 deletions(-) create mode 100644 java/udf-example/pom.xml rename java/{udf/src/main/java/com/risingwave/functions => udf-example/src/main/java/com}/example/UdfExample.java (96%) diff --git a/ci/scripts/build-other.sh b/ci/scripts/build-other.sh index 13119c59bfa08..6b5a7c4f99186 100755 --- a/ci/scripts/build-other.sh +++ b/ci/scripts/build-other.sh @@ -13,6 +13,6 @@ cd .. echo "--- Upload Java artifacts" cp java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz ./risingwave-connector.tar.gz -cp java/udf/target/risingwave-udf-example.jar ./risingwave-udf-example.jar +cp java/udf-example/target/risingwave-udf-example.jar ./risingwave-udf-example.jar buildkite-agent artifact upload ./risingwave-connector.tar.gz buildkite-agent artifact upload ./risingwave-udf-example.jar diff --git a/e2e_test/udf/udf.slt b/e2e_test/udf/udf.slt index 10a0370aa7853..0f5b123712309 100644 --- a/e2e_test/udf/udf.slt +++ b/e2e_test/udf/udf.slt @@ -1,7 +1,7 @@ # Before running this test: # python3 e2e_test/udf/test.py # or: -# cd src/udf/java && mvn package && java -jar target/risingwave-udf-example.jar +# cd java/udf-example && mvn package && java -jar target/risingwave-udf-example.jar # Create a function. statement ok diff --git a/java/pom.xml b/java/pom.xml index 86cc708bfb2be..4e4c8112176ba 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -9,6 +9,7 @@ proto udf + udf-example java-binding common-utils java-binding-integration-test diff --git a/java/udf-example/pom.xml b/java/udf-example/pom.xml new file mode 100644 index 0000000000000..3b57ca1dde714 --- /dev/null +++ b/java/udf-example/pom.xml @@ -0,0 +1,67 @@ + + + + 4.0.0 + + com.example + udf-example + 1.0-SNAPSHOT + + udf-example + http://maven.apache.org + + + UTF-8 + 11 + 11 + + + + + com.risingwave.java + risingwave-udf + 0.0.1 + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M6 + + --add-opens=java.base/java.nio=ALL-UNNAMED + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.4.2 + + + + com.example.UdfExample + + + + jar-with-dependencies + + risingwave-udf-example + false + + + + udf-example + package + + single + + + + + + + diff --git a/java/udf/src/main/java/com/risingwave/functions/example/UdfExample.java b/java/udf-example/src/main/java/com/example/UdfExample.java similarity index 96% rename from java/udf/src/main/java/com/risingwave/functions/example/UdfExample.java rename to java/udf-example/src/main/java/com/example/UdfExample.java index bc913813f460f..d4e9c02df8498 100644 --- a/java/udf/src/main/java/com/risingwave/functions/example/UdfExample.java +++ b/java/udf-example/src/main/java/com/example/UdfExample.java @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.risingwave.functions.example; +package com.example; import java.io.IOException; import java.math.BigDecimal; @@ -35,7 +35,6 @@ public static void main(String[] args) throws IOException { server.addFunction("int_42", new Int42()); server.addFunction("gcd", new Gcd()); server.addFunction("gcd3", new Gcd3()); - server.addFunction("to_string", new ToString()); server.addFunction("extract_tcp_info", new ExtractTcpInfo()); server.addFunction("hex_to_dec", new HexToDec()); server.addFunction("array_access", new ArrayAccess()); @@ -77,12 +76,6 @@ public int eval(int a, int b, int c) { } } - public static class ToString implements ScalarFunction { - public String eval(String s) { - return s; - } - } - public static class ExtractTcpInfo implements ScalarFunction { public static class TcpPacketInfo { public String srcAddr; diff --git a/java/udf/pom.xml b/java/udf/pom.xml index 0db50f4071c53..2f84a7b034388 100644 --- a/java/udf/pom.xml +++ b/java/udf/pom.xml @@ -63,33 +63,6 @@ --add-opens=java.base/java.nio=ALL-UNNAMED - - - org.apache.maven.plugins - maven-assembly-plugin - 3.4.2 - - - - com.risingwave.functions.example.UdfExample - - - - jar-with-dependencies - - risingwave-udf-example - false - - - - udf-example - package - - single - - - - diff --git a/java/udf/src/main/java/com/risingwave/functions/TableFunction.java b/java/udf/src/main/java/com/risingwave/functions/TableFunction.java index 45e2266b3ff38..e716bd11fee23 100644 --- a/java/udf/src/main/java/com/risingwave/functions/TableFunction.java +++ b/java/udf/src/main/java/com/risingwave/functions/TableFunction.java @@ -40,7 +40,7 @@ * // given number. * class Series implements TableFunction { * public Iterator eval(int n) { - * return IntStream.range(0, n).iterator(); + * return java.util.stream.IntStream.range(0, n).iterator(); * } * } * diff --git a/java/udf/src/test/java/com/risingwave/functions/TestUdfServer.java b/java/udf/src/test/java/com/risingwave/functions/TestUdfServer.java index da598fe3dd6d5..00b4cf0dacd1c 100644 --- a/java/udf/src/test/java/com/risingwave/functions/TestUdfServer.java +++ b/java/udf/src/test/java/com/risingwave/functions/TestUdfServer.java @@ -15,6 +15,9 @@ package com.risingwave.functions; import java.io.IOException; +import java.util.Iterator; +import java.util.stream.IntStream; + import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; @@ -26,10 +29,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; -import com.risingwave.functions.example.*; - /** - * Unit test for simple App. + * Unit test for UDF server. */ public class TestUdfServer { private static UdfClient client; @@ -39,9 +40,9 @@ public class TestUdfServer { @BeforeAll public static void setup() throws IOException { server = new UdfServer("localhost", 0); - server.addFunction("gcd", new UdfExample.Gcd()); - server.addFunction("to_string", new UdfExample.ToString()); - server.addFunction("series", new UdfExample.Series()); + server.addFunction("gcd", new Gcd()); + server.addFunction("to_string", new ToString()); + server.addFunction("series", new Series()); server.start(); client = new UdfClient("localhost", server.getPort()); @@ -53,6 +54,17 @@ public static void teardown() throws InterruptedException { server.close(); } + public static class Gcd implements ScalarFunction { + public int eval(int a, int b) { + while (b != 0) { + int temp = b; + b = a % b; + a = temp; + } + return a; + } + } + @Test public void gcd() throws Exception { var c0 = new IntVector("", allocator); @@ -74,6 +86,12 @@ public void gcd() throws Exception { } } + public static class ToString implements ScalarFunction { + public String eval(String s) { + return s; + } + } + @Test public void to_string() throws Exception { var c0 = new VarCharVector("", allocator); @@ -89,6 +107,12 @@ public void to_string() throws Exception { } } + public static class Series implements TableFunction { + public Iterator eval(int n) { + return IntStream.range(0, n).iterator(); + } + } + @Test public void series() throws Exception { var c0 = new IntVector("", allocator); diff --git a/java/udf/src/test/java/com/risingwave/functions/UdfClient.java b/java/udf/src/test/java/com/risingwave/functions/UdfClient.java index 4db47fc00f521..b4c4f5b5ce480 100644 --- a/java/udf/src/test/java/com/risingwave/functions/UdfClient.java +++ b/java/udf/src/test/java/com/risingwave/functions/UdfClient.java @@ -17,13 +17,10 @@ import org.apache.arrow.flight.*; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class UdfClient implements AutoCloseable { private FlightClient client; - private static final Logger logger = LoggerFactory.getLogger(UdfClient.class); public UdfClient(String host, int port) { var allocator = new RootAllocator(); From 45174e02ec8d896e01669f59a467cd806b8f9b90 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 14 Jun 2023 01:21:06 +0800 Subject: [PATCH 2/3] allow language java --- src/frontend/src/handler/create_function.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index 120e6f6d17c79..b4481fbce9911 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -59,9 +59,9 @@ pub async fn handle_create_function( ) } }; - if language != "python" { + if !matches!(language.as_str(), "python" | "java") { return Err(ErrorCode::InvalidParameterValue( - "LANGUAGE should be one of: python".to_string(), + "LANGUAGE should be one of: python, java".to_string(), ) .into()); } From 3c4ef00b47469a5037066ae6128521686fcc0465 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 14 Jun 2023 01:59:05 +0800 Subject: [PATCH 3/3] add simple docs for java udf --- java/udf-example/README.md | 8 ++ java/udf/README.md | 256 ++++++++++++++++++++++++++++++++++++- 2 files changed, 260 insertions(+), 4 deletions(-) create mode 100644 java/udf-example/README.md diff --git a/java/udf-example/README.md b/java/udf-example/README.md new file mode 100644 index 0000000000000..2831815c3df5b --- /dev/null +++ b/java/udf-example/README.md @@ -0,0 +1,8 @@ +## RisingWave Java UDF Example + +Make sure you have installed Java 11 and Maven 3 or later. + +```sh +mvn package +java -jar target/risingwave-udf-example.jar +``` diff --git a/java/udf/README.md b/java/udf/README.md index 74542f51a801d..0785cd38c18d2 100644 --- a/java/udf/README.md +++ b/java/udf/README.md @@ -1,8 +1,256 @@ -## How to run example +# RisingWave Java UDF SDK -Make sure you have installed Java 11 and Maven 3 or later. +This library provides a Java SDK for creating user-defined functions (UDF) in RisingWave. + +## Introduction + +RisingWave supports user-defined functions implemented as external functions. +With the RisingWave Java UDF SDK, users can define custom UDFs using Java and start a Java process as a UDF server. +RisingWave can then remotely access the UDF server to execute the defined functions. + +## Installation + +To install the RisingWave Java UDF SDK: + +```sh +git clone https://github.com/risingwavelabs/risingwave.git +cd risingwave/java/udf +mvn install +``` + +## Creating a New Project + +> NOTE: You can also start from the [udf-example](../udf-example) project without creating the project from scratch. + +To create a new project using the RisingWave Java UDF SDK, follow these steps: + +```sh +mvn archetype:generate -DgroupId=com.example -DartifactId=udf-example -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false +``` + +Configure your `pom.xml` file as follows: + +```xml + + + 4.0.0 + com.example + udf-example + 1.0-SNAPSHOT + + + + com.risingwave.java + risingwave-udf + 0.0.1 + + + +``` + +The `--add-opens` flag must be added when running unit tests through Maven: + +```xml + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M7 + + --add-opens=java.base/java.nio=ALL-UNNAMED + + + + +``` + +## Scalar Functions + +A user-defined scalar function maps zero, one, or multiple scalar values to a new scalar value. + +In order to define a scalar function, one has to create a new class that implements the `ScalarFunction` +interface in `com.risingwave.functions` and implement exactly one evaluation method named `eval(...)`. +This method must be declared public and non-static. + +Any [data type](#data-types) listed in the data types section can be used as a parameter or return type of an evaluation method. + +Here's an example of a scalar function that calculates the greatest common divisor (GCD) of two integers: + +```java +import com.risingwave.functions.ScalarFunction; + +public class Gcd implements ScalarFunction { + public int eval(int a, int b) { + while (b != 0) { + int temp = b; + b = a % b; + a = temp; + } + return a; + } +} +``` + +> **NOTE:** Differences with Flink +> 1. The `ScalarFunction` is an interface instead of an abstract class. +> 2. Multiple overloaded `eval` methods are not supported. +> 3. Variable arguments such as `eval(Integer...)` are not supported. + +## Table Functions + +A user-defined table function maps zero, one, or multiple scalar values to one or multiple +rows (structured types). + +In order to define a table function, one has to create a new class that implements the `TableFunction` +interface in `com.risingwave.functions` and implement exactly one evaluation method named `eval(...)`. +This method must be declared public and non-static. + +The return type must be an `Iterator` of any [data type](#data-types) listed in the data types section. +Similar to scalar functions, input and output data types are automatically extracted using reflection. +This includes the generic argument T of the return value for determining an output data type. + +Here's an example of a table function that generates a series of integers: + +```java +import com.risingwave.functions.TableFunction; + +public class Series implements TableFunction { + public Iterator eval(int n) { + return java.util.stream.IntStream.range(0, n).iterator(); + } +} +``` + +> **NOTE:** Differences with Flink +> 1. The `TableFunction` is an interface instead of an abstract class. It has no generic arguments. +> 2. Instead of calling `collect` to emit a row, the `eval` method returns an `Iterator` of the output rows. +> 3. Multiple overloaded `eval` methods are not supported. +> 4. Variable arguments such as `eval(Integer...)` are not supported. +> 5. In SQL, table functions can be used in the `FROM` clause directly. `JOIN LATERAL TABLE` is not supported. + +## UDF Server + +To create a UDF server and register functions: + +```java +import com.risingwave.functions.UdfServer; + +public class App { + public static void main(String[] args) { + try (var server = new UdfServer("0.0.0.0", 8815)) { + // register functions + server.addFunction("gcd", new Gcd()); + server.addFunction("series", new Series()); + // start the server + server.start(); + server.awaitTermination(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} +``` + +To run the UDF server, execute the following command: ```sh -mvn package -java -jar target/risingwave-udf-example.jar +_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="com.example.App" +``` + +## Creating Functions in RisingWave + +```sql +create function gcd(int, int) returns int +language java as gcd using link 'http://localhost:8815'; + +create function series(int) returns table (x int) +language java as series using link 'http://localhost:8815'; +``` + +For more detailed information and examples, please refer to the official RisingWave [documentation](https://www.risingwave.dev/docs/current/user-defined-functions/#4-declare-your-functions-in-risingwave). + +## Using Functions in RisingWave + +Once the user-defined functions are created in RisingWave, you can use them in SQL queries just like any built-in functions. Here are a few examples: + +```sql +select gcd(25, 15); + +select * from series(10); ``` + +## Data Types + +The RisingWave Java UDF SDK supports the following data types: + +| SQL Type | Java Type | Notes | +| --------- | ------------------ | ------------------ | +| SMALLINT | short, Short | | +| INT | int, Integer | | +| BIGINT | long, Long | | +| FLOAT | float, Float | | +| DOUBLE | double, Double | | +| DECIMAL | BigDecimal | | +| VARCHAR | String | | +| BYTEA | byte[] | | +| JSONB | String | Use `@DataTypeHint("JSONB") String` as the type. See [example](#jsonb). | +| JSONB[] | String[] | Use `@DataTypeHint("JSONB[]") String[]` as the type. | +| STRUCT<> | user-defined class | Define a data class as the type. See [example](#struct-type). | +| ...others | | Not supported yet. | + +### JSONB + +```java +import com.google.gson.Gson; + +// Returns the i-th element of a JSON array. +public class JsonbAccess implements ScalarFunction { + static Gson gson = new Gson(); + + public @DataTypeHint("JSONB") String eval(@DataTypeHint("JSONB") String json, int index) { + if (json == null) + return null; + var array = gson.fromJson(json, Object[].class); + if (index >= array.length || index < 0) + return null; + var obj = array[index]; + return gson.toJson(obj); + } +} +``` + +```sql +create function jsonb_access(jsonb, int) returns jsonb +language java as jsonb_access using link 'http://localhost:8815'; +``` + +### Struct Type + +```java +// Split a socket address into host and port. +public static class IpPort implements ScalarFunction { + public static class SocketAddr { + public String host; + public short port; + } + + public SocketAddr eval(String addr) { + var socketAddr = new SocketAddr(); + var parts = addr.split(":"); + socketAddr.host = parts[0]; + socketAddr.port = Short.parseShort(parts[1]); + return socketAddr; + } +} +``` + +```sql +create function ip_port(varchar) returns struct +language java as ip_port using link 'http://localhost:8815'; +``` + +## Full Example + +You can checkout [udf-example](../udf-example) and use it as a template to create your own UDFs.