Skip to content

Commit

Permalink
[improvement](stream load) support hll_from_base64 for stream load co…
Browse files Browse the repository at this point in the history
…lumn mapping (apache#35923)
  • Loading branch information
gnehil authored Jun 25, 2024
1 parent 4b975ae commit eaebad6
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public boolean isNullResultWithOneNullParamFunctions(String funcName) {
public static final String HLL_UNION_AGG = "hll_union_agg";
public static final String HLL_RAW_AGG = "hll_raw_agg";
public static final String HLL_CARDINALITY = "hll_cardinality";
public static final String HLL_FROM_BASE64 = "hll_from_base64";

public static final String TO_BITMAP = "to_bitmap";
public static final String TO_BITMAP_WITH_CHECK = "to_bitmap_with_check";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,11 @@ protected void finalizeParamsForLoad(ParamCreateContext context,
}
FunctionCallExpr fn = (FunctionCallExpr) expr;
if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName()
.getFunction().equalsIgnoreCase("hll_empty")) {
.getFunction().equalsIgnoreCase("hll_empty")
&& !fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_FROM_BASE64)) {
throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like "
+ destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or "
+ destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_FROM_BASE64 + "(xxx) or "
+ destSlotDesc.getColumn().getName() + "=hll_empty()");
}
expr.setType(org.apache.doris.catalog.Type.HLL);
Expand Down
12 changes: 12 additions & 0 deletions regression-test/data/load_p0/http_stream/test_http_stream.out
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,15 @@
1 test
2 test

-- !sql19 --
buag 1 1
huang 1 1
jfin 1 1
koga 1 1
kon 1 1
lofn 1 1
lojn 1 1
nfubg 1 1
nhga 1 1
nijg 1 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
1001,koga,AQEMYSmSmfh+mA==
1002,nijg,AQGs1RXTaA+hkQ==
1003,lojn,AQFyJr4rwn+S0A==
1004,lofn,AQFvE0bU6Pc9uw==
1005,jfin,AQEmxbO3VGItCA==
1006,kon,AQEm5d0Gw4uvZw==
1007,nhga,AQHOpocenFnBwQ==
1008,nfubg,AQFzYsFz+NIgUg==
1009,huang,AQH2slI7qAUmYA==
1010,buag,AQGBXZ3xnU79YA==
12 changes: 12 additions & 0 deletions regression-test/data/load_p0/stream_load/test_stream_load_new.out
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,15 @@
10009 jj
10010 kk

-- !sql13 --
buag 1 1
huang 1 1
jfin 1 1
koga 1 1
kon 1 1
lofn 1 1
lojn 1 1
nfubg 1 1
nhga 1 1
nijg 1 1

41 changes: 41 additions & 0 deletions regression-test/suites/load_p0/http_stream/test_http_stream.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -854,5 +854,46 @@ suite("test_http_stream", "p0") {
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName18}"
}

// test load hll type
def tableName19 = "test_http_stream_hll_type"

try {
sql """
CREATE TABLE IF NOT EXISTS ${tableName19} (
type_id int,
type_name varchar(10),
pv_hash hll hll_union not null,
pv_base64 hll hll_union not null
)
AGGREGATE KEY(type_id,type_name)
DISTRIBUTED BY HASH(type_id) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)
"""

streamLoad {
set 'version', '1'
set 'sql', """
insert into ${db}.${tableName19} select c1,c2,hll_hash(c1),hll_from_base64(c3) from http_stream("format"="csv", "column_separator"=",")
"""
time 10000
file '../stream_load/test_stream_load_hll_type.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("http_stream result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
}
}

qt_sql19 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName19} group by type_name order by type_name"
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName19}"
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -540,5 +540,47 @@ suite("test_stream_load_new", "p0") {
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName12}"
}

// 13. test stream load hll type
def tableName13 = "test_stream_load_hll_type"

try {
sql """
CREATE TABLE IF NOT EXISTS ${tableName13} (
type_id int,
type_name varchar(10),
pv_hash hll hll_union not null,
pv_base64 hll hll_union not null
)
AGGREGATE KEY(type_id,type_name)
DISTRIBUTED BY HASH(type_id) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)
"""

streamLoad {
set 'column_separator', ','
set 'columns', 'type_id,type_name,type_id_base64,pv_hash=hll_hash(type_id),pv_base64=hll_from_base64(type_id_base64)'
table "${tableName13}"
time 10000
file 'test_stream_load_hll_type.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(10, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
sql """ sync; """
qt_sql13 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName13} group by type_name order by type_name"
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName13}"
}

}

0 comments on commit eaebad6

Please sign in to comment.