From 60987a6926f70aeaf2628f1bdaeb71a5e97addf4 Mon Sep 17 00:00:00 2001 From: gnehil Date: Tue, 25 Jun 2024 11:28:03 +0800 Subject: [PATCH] [improvement](stream load) support hll_from_base64 for stream load column mapping (#35923) (cherry picked from commit eaebad61090e07bb1208c769960c7d5134ec2d03) --- .../org/apache/doris/catalog/FunctionSet.java | 1 + .../doris/planner/FileLoadScanNode.java | 4 +- .../load_p0/http_stream/test_http_stream.out | 12 ++++++ .../stream_load/test_stream_load_hll_type.csv | 10 +++++ .../stream_load/test_stream_load_new.out | 12 ++++++ .../http_stream/test_http_stream.groovy | 41 ++++++++++++++++++ .../stream_load/test_stream_load_new.groovy | 42 +++++++++++++++++++ 7 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java index b0d4c654531781..2db943993dd42f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -178,6 +178,7 @@ public boolean isNullResultWithOneNullParamFunctions(String funcName) { public static final String HLL_UNION_AGG = "hll_union_agg"; public static final String HLL_RAW_AGG = "hll_raw_agg"; public static final String HLL_CARDINALITY = "hll_cardinality"; + public static final String HLL_FROM_BASE64 = "hll_from_base64"; public static final String TO_BITMAP = "to_bitmap"; public static final String TO_BITMAP_WITH_CHECK = "to_bitmap_with_check"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java index ca0324a51d0d93..0d674a705170c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java @@ -280,9 +280,11 @@ protected void finalizeParamsForLoad(ParamCreateContext context, } FunctionCallExpr fn = (FunctionCallExpr) expr; if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName() - .getFunction().equalsIgnoreCase("hll_empty")) { + .getFunction().equalsIgnoreCase("hll_empty") + && !fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_FROM_BASE64)) { throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_FROM_BASE64 + "(xxx) or " + destSlotDesc.getColumn().getName() + "=hll_empty()"); } expr.setType(org.apache.doris.catalog.Type.HLL); diff --git a/regression-test/data/load_p0/http_stream/test_http_stream.out b/regression-test/data/load_p0/http_stream/test_http_stream.out index 7ce24eea095069..2475ed24961101 100644 --- a/regression-test/data/load_p0/http_stream/test_http_stream.out +++ b/regression-test/data/load_p0/http_stream/test_http_stream.out @@ -620,3 +620,15 @@ 1 test 2 test +-- !sql19 -- +buag 1 1 +huang 1 1 +jfin 1 1 +koga 1 1 +kon 1 1 +lofn 1 1 +lojn 1 1 +nfubg 1 1 +nhga 1 1 +nijg 1 1 + diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv new file mode 100644 index 00000000000000..0b1d798782c9cf --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv @@ -0,0 +1,10 @@ +1001,koga,AQEMYSmSmfh+mA== +1002,nijg,AQGs1RXTaA+hkQ== +1003,lojn,AQFyJr4rwn+S0A== +1004,lofn,AQFvE0bU6Pc9uw== +1005,jfin,AQEmxbO3VGItCA== +1006,kon,AQEm5d0Gw4uvZw== +1007,nhga,AQHOpocenFnBwQ== +1008,nfubg,AQFzYsFz+NIgUg== +1009,huang,AQH2slI7qAUmYA== +1010,buag,AQGBXZ3xnU79YA== \ No newline at end of file diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_new.out b/regression-test/data/load_p0/stream_load/test_stream_load_new.out index 52440d984368ef..f251042a9dfc06 100644 --- a/regression-test/data/load_p0/stream_load/test_stream_load_new.out +++ b/regression-test/data/load_p0/stream_load/test_stream_load_new.out @@ -124,3 +124,15 @@ 10009 jj 10010 kk +-- !sql13 -- +buag 1 1 +huang 1 1 +jfin 1 1 +koga 1 1 +kon 1 1 +lofn 1 1 +lojn 1 1 +nfubg 1 1 +nhga 1 1 +nijg 1 1 + diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy index 781732988e5f51..5411224c200714 100644 --- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy @@ -854,5 +854,46 @@ suite("test_http_stream", "p0") { } finally { try_sql "DROP TABLE IF EXISTS ${tableName18}" } + + // test load hll type + def tableName19 = "test_http_stream_hll_type" + + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName19} ( + type_id int, + type_name varchar(10), + pv_hash hll hll_union not null, + pv_base64 hll hll_union not null + ) + AGGREGATE KEY(type_id,type_name) + DISTRIBUTED BY HASH(type_id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${tableName19} select c1,c2,hll_hash(c1),hll_from_base64(c3) from http_stream("format"="csv", "column_separator"=",") + """ + time 10000 + file '../stream_load/test_stream_load_hll_type.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("http_stream result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + + qt_sql19 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName19} group by type_name order by type_name" + } finally { + try_sql "DROP TABLE IF EXISTS ${tableName19}" + } + } diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy index 7df57ebbd161fc..48c3e5f9654870 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy @@ -540,5 +540,47 @@ suite("test_stream_load_new", "p0") { } finally { try_sql "DROP TABLE IF EXISTS ${tableName12}" } + + // 13. test stream load hll type + def tableName13 = "test_stream_load_hll_type" + + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName13} ( + type_id int, + type_name varchar(10), + pv_hash hll hll_union not null, + pv_base64 hll hll_union not null + ) + AGGREGATE KEY(type_id,type_name) + DISTRIBUTED BY HASH(type_id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + streamLoad { + set 'column_separator', ',' + set 'columns', 'type_id,type_name,type_id_base64,pv_hash=hll_hash(type_id),pv_base64=hll_from_base64(type_id_base64)' + table "${tableName13}" + time 10000 + file 'test_stream_load_hll_type.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(10, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + sql """ sync; """ + qt_sql13 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName13} group by type_name order by type_name" + } finally { + try_sql "DROP TABLE IF EXISTS ${tableName13}" + } + }