Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): Add deno as UDF language #15719

Merged
merged 46 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
1698b08
feat(udf): Add deno as UDF language
bakjos Mar 15, 2024
23ce152
chore: Update nightly rust version
bakjos Mar 15, 2024
2f55dac
feat: Update arrow-udf-deno to latest commit
bakjos Mar 15, 2024
1c51eeb
feat: Update ahash version
bakjos Mar 15, 2024
cf45366
feat: Update crc32c version
bakjos Mar 15, 2024
7580838
feat: Rollback rust nightly toolchain version
bakjos Mar 15, 2024
266f9c9
fix: Compilation issues
bakjos Mar 16, 2024
37751c4
feat: Include column in migration
bakjos Mar 16, 2024
5782137
fix: Compare json values instead of strings
bakjos Mar 16, 2024
07452f2
feat: Update deno library to use main risingwavelabs repo
bakjos Mar 20, 2024
7a68561
feat: Add runtime option for javascript language
bakjos Mar 23, 2024
fb9181f
fix: keyword sorting errors
bakjos Mar 23, 2024
b97d077
feat: Updload and download sse-server binary
bakjos Mar 23, 2024
90b5fe0
feat: Create dir before download sse-server artifact
bakjos Mar 23, 2024
5ea2fcd
feat: Add execution permissions for sse-server
bakjos Mar 23, 2024
12c1e52
feat: Patch deno core to remove preserve_order
bakjos Mar 23, 2024
f8d53b5
feat: Bump deno to version 1.42.1
bakjos Apr 1, 2024
3b70a89
feat: Address PR comments
bakjos Apr 2, 2024
3f05729
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 2, 2024
f85eee2
feat: Use flask for sse-server
bakjos Apr 2, 2024
bccc02e
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 2, 2024
4d75e6a
chore: Fix fmt errors
bakjos Apr 2, 2024
94b2550
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 2, 2024
39c185f
feat: Wait for server to be ready
bakjos Apr 2, 2024
15c7134
feat: Use tokio block_in_place instead of future block_on
bakjos Apr 3, 2024
1a216c9
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 3, 2024
5532961
feat: Downgrade reqwest to 0.11.20
bakjos Apr 3, 2024
43099fd
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 3, 2024
88879fa
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 4, 2024
124b1bb
feat: Patch deno libraries to use reqwest 0.12
bakjos Apr 4, 2024
9d1f3c8
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 4, 2024
cd92627
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 8, 2024
27d1949
feat: Address PR comments
bakjos Apr 8, 2024
868954a
chore: Revert dalvik to 4.1.1
bakjos Apr 8, 2024
05b0884
feat: Separate alter commands
bakjos Apr 8, 2024
e372e86
Merge branch 'main' into bakjos/deno_udf
wangrunji0408 Apr 9, 2024
f6a1fa3
feat: Update Cargo.lock
bakjos Apr 9, 2024
2af566e
Merge branch 'main' into bakjos/deno_udf
bakjos Apr 9, 2024
7388e15
feat: Use waitress to start the sse server
bakjos Apr 9, 2024
da7f150
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 9, 2024
9d18e25
feat: Update sse mock server protocol to 1.1
bakjos Apr 10, 2024
a49a4d9
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 10, 2024
ce3226b
chore: Remove extra whitespaces
bakjos Apr 10, 2024
89b3d4c
feat: Update deno-udf version
bakjos Apr 10, 2024
0c2360b
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
bakjos Apr 10, 2024
3a9859e
Merge branch 'main' into bakjos/deno_udf
bakjos Apr 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,112 changes: 1,831 additions & 281 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ members = [
"src/utils/workspace-config",
"src/workspace-hack",
]
exclude = ["e2e_test/udf/wasm", "lints"]
exclude = ["e2e_test/udf/sse/server", "e2e_test/udf/wasm", "lints"]
resolver = "2"

[workspace.package]
Expand Down Expand Up @@ -138,6 +138,7 @@ arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
arrow-udf-js = "0.1"
arrow-udf-js-deno = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "4c4e176" }
arrow-udf-wasm = { version = "0.2", features = ["build"] }
arrow-udf-python = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "6c32f71" }
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
Expand Down Expand Up @@ -306,6 +307,8 @@ futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev =
etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "4e84d40" }
# todo(wcy-fdu): remove this patch fork after opendal release a new version to apply azure workload identity change.
reqsign = { git = "https://github.com/wcy-fdu/reqsign.git", rev = "e6cb304" }
# patch to remove preserve_order from serde_json
deno_core = { git = "https://github.com/bakjos/deno_core", rev = "9b241c6" }

[workspace.metadata.dylint]
libraries = [{ path = "./lints" }]
6 changes: 6 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ is_release = get_env ENABLE_RELEASE_PROFILE
is_not_release = not ${is_release}
is_dynamic_linking = get_env ENABLE_DYNAMIC_LINKING
is_hummock_trace = get_env ENABLE_HUMMOCK_TRACE
is_deno_udf_enabled = get_env ENABLE_DENO_UDF
is_python_udf_enabled = get_env ENABLE_PYTHON_UDF

if ${is_sanitizer_enabled}
Expand All @@ -60,6 +61,11 @@ else
set_env RISINGWAVE_FEATURE_FLAGS "--features rw-static-link"
end

if ${is_deno_udf_enabled}
flags = get_env RISINGWAVE_FEATURE_FLAGS
set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features embedded-deno-udf"
end

if ${is_python_udf_enabled}
flags = get_env RISINGWAVE_FEATURE_FLAGS
set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features embedded-python-udf"
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ cargo build \
-p risingwave_compaction_test \
-p risingwave_e2e_extended_mode_test \
$RISINGWAVE_FEATURE_FLAGS \
--features embedded-deno-udf \
--features embedded-python-udf \
--profile "$profile"

Expand Down
1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ sqllogictest -p 4566 -d dev './e2e_test/udf/wasm_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/rust_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/js_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/python_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/deno_udf.slt'

echo "--- Kill cluster"
cluster_stop
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ WORKDIR /risingwave
ENV ENABLE_BUILD_DASHBOARD=1

RUN cargo fetch && \
cargo build -p risingwave_cmd_all --release --features "rw-static-link" --features embedded-python-udf && \
cargo build -p risingwave_cmd_all --release --features "rw-static-link" --features embedded-deno-udf --features embedded-python-udf && \
mkdir -p /risingwave/bin && \
mv /risingwave/target/release/risingwave /risingwave/bin/ && \
mv /risingwave/target/release/risingwave.dwp /risingwave/bin/ && \
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ ENV JAVA_HOME ${JAVA_HOME_PATH}
ENV LD_LIBRARY_PATH ${JAVA_HOME_PATH}/lib/server:${LD_LIBRARY_PATH}

RUN cargo fetch && \
cargo build -p risingwave_cmd_all --release -p risingwave_object_store --features hdfs-backend --features "rw-static-link" --features embedded-python-udf && \
cargo build -p risingwave_cmd_all --release -p risingwave_object_store --features hdfs-backend --features "rw-static-link" --features embedded-deno-udf --features embedded-python-udf && \
mkdir -p /risingwave/bin && \
mv /risingwave/target/release/risingwave /risingwave/bin/ && \
mv /risingwave/target/release/risingwave.dwp /risingwave/bin/ && \
Expand Down
246 changes: 246 additions & 0 deletions e2e_test/udf/deno_udf.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
statement ok
CREATE FUNCTION gcd(a int, b int) RETURNS int LANGUAGE javascript RUNTIME deno AS $$
if(a == null || b == null) {
return null;
}
while (b != 0) {
let t = b;
b = a % b;
a = t;
}
return a;
$$;

query I
select gcd(25, 15);
----
5

statement ok
drop function gcd;

statement ok
create function decimal_add(a decimal, b decimal) returns decimal language javascript RUNTIME deno as $$
return a.add(b);
$$;

query R
select decimal_add(1.11, 2.22);
----
3.33

statement ok
drop function decimal_add;


statement ok
create function to_string(a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb) returns varchar language javascript RUNTIME deno as $$
return a.toString() + b.toString() + c.toString() + d.toString() + e.toString() + f.toString() + g.toString() + h.toString() + i.toString() + JSON.stringify(j);
$$;

query T
select to_string(false, 1::smallint, 2, 3, 4.5, 6.7, 8.9, 'abc', '\x010203', '{"key": 1}');
----
false1234.56.78.9abc1,2,3{"key":1}

statement ok
drop function to_string;

# show data types in javascript
statement ok
create function js_typeof(a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb) returns jsonb language javascript RUNTIME deno as $$
return {
boolean: typeof a,
smallint: typeof b,
int: typeof c,
bigint: typeof d,
real: typeof e,
float: typeof f,
decimal: typeof g,
varchar: typeof h,
bytea: typeof i,
jsonb: typeof j,
};
$$;

query T
select js_typeof(false, 1::smallint, 2, 3, 4.5, 6.7, 8.9, 'abc', '\x010203', '{"key": 1}');
----
{"bigint": "bigint", "boolean": "boolean", "bytea": "object", "decimal": "object", "float": "number", "int": "number", "jsonb": "object", "real": "number", "smallint": "number", "varchar": "string"}

statement ok
drop function js_typeof;

statement ok
create function return_all(a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb, s struct<f1 int, f2 int>)
returns struct<a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb, s struct<f1 int, f2 int>>
language javascript runtime deno as $$
return {a,b,c,d,e,f,g,h,i,j,s};
$$;

query T
select (return_all(
true,
1 ::smallint,
1,
1,
1,
1,
12345678901234567890.12345678,
'string',
'bytes',
'{"key":1}',
row(1, 2)::struct<f1 int, f2 int>
)).*;
----
t 1 1 1 1 1 12345678901234567890.12345678 string \x6279746573 {"key": 1} (1,2)

statement ok
drop function return_all;


statement ok
create function series(n int) returns table (x int) language javascript RUNTIME deno as $$
for(let i = 0; i < n; i++) {
yield i;
}
$$;

query I
select series(5);
----
0
1
2
3
4

statement ok
drop function series;


statement ok
create function split(s varchar) returns table (word varchar, length int) language javascript RUNTIME deno as $$
for(let word of s.split(' ')) {
yield { word: word, length: word.length };
}
$$;

query IT
select * from split('rising wave');
----
rising 6
wave 4

statement ok
drop function split;


statement ok
CREATE FUNCTION digest( t string ) RETURNS bytea LANGUAGE javascript RUNTIME deno AS $$
const subtle = crypto.subtle;
const key = await subtle.generateKey({
name: 'HMAC',
hash: 'SHA-256',
length: 256,
}, true, ['sign', 'verify']);
const enc = new TextEncoder();
const message = enc.encode(t);
const result = await subtle.sign({
name: 'HMAC',
}, key, message);
console.log('result', result);
return result;
$$ ASYNC;

query I
select bit_length(digest('Hello'));
----
256

statement ok
drop function digest;

statement ok
CREATE FUNCTION delay_response()
RETURNS TABLE (x int) LANGUAGE javascript RUNTIME deno AS $$
const delayedResponses = {
delays: [50, 10, 15],
wait(delay) {
return new Promise((resolve) => {
setTimeout(resolve, delay);
});
},
async *[Symbol.asyncIterator]() {
for (const delay of this.delays) {
await this.wait(delay);
yield delay;
}
},
};
return delayedResponses;
$$ SYNC;

query I
select * FROM delay_response();
----
50
10
15

statement ok
drop function delay_response;

system ok
python3 e2e_test/udf/mock_server.py &

# wait for server to start
sleep 1s

statement ok
CREATE FUNCTION fetch_api() RETURNS TABLE ( data struct< idx int>) LANGUAGE javascript RUNTIME deno AS $$
const response = await fetch('http://127.0.0.1:4101');
const resp = await response.json();
for (const r of resp.results) {
yield r;
}
$$ ASYNC GENERATOR;

query I
select * FROM fetch_api();
----
1
2

statement ok
drop function fetch_api;

system ok
pkill -9 python3

system ok
pip3 install -r e2e_test/udf/sse/requirements.txt

system ok
python3 e2e_test/udf/sse/sse.py &

# waits for the server to start
sleep 5s

statement ok
CREATE FUNCTION call_sse() RETURNS TABLE ( data struct<data struct<greetings string>>) LANGUAGE javascript RUNTIME deno USING LINK 'fs://e2e_test/udf/sse/bundled.table.js' SYNC;

query I
select * FROM call_sse();
----
(Hi)
(Bonjour)
(Hola)
(Ciao)
(Zdravo)

system ok
pkill -9 python3

statement ok
drop function call_sse;
24 changes: 24 additions & 0 deletions e2e_test/udf/mock_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import http.server
import sys

class MockHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
"""HTTPServer mock request handler"""

def do_GET(self): # pylint: disable=invalid-name
"""Handle GET requests"""
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b'{"results": [{"idx": 1}, {"idx": 2}]}')

def log_request(self, code=None, size=None):
"""Don't log anything"""

def main() -> int:
"""Echo the input arguments to standard output"""

httpd = http.server.HTTPServer( ("127.0.0.1", 4101), MockHTTPRequestHandler)
httpd.serve_forever()

if __name__ == '__main__':
sys.exit(main())
Loading
Loading