Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](stream load)(cherry-pick) support hll_from_base64 for stream load column mapping #36819

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public boolean isNullResultWithOneNullParamFunctions(String funcName) {
public static final String HLL_UNION_AGG = "hll_union_agg";
public static final String HLL_RAW_AGG = "hll_raw_agg";
public static final String HLL_CARDINALITY = "hll_cardinality";
public static final String HLL_FROM_BASE64 = "hll_from_base64";

public static final String TO_BITMAP = "to_bitmap";
public static final String TO_BITMAP_WITH_CHECK = "to_bitmap_with_check";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,11 @@ protected void finalizeParamsForLoad(ParamCreateContext context,
}
FunctionCallExpr fn = (FunctionCallExpr) expr;
if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName()
.getFunction().equalsIgnoreCase("hll_empty")) {
.getFunction().equalsIgnoreCase("hll_empty")
&& !fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_FROM_BASE64)) {
throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like "
+ destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or "
+ destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_FROM_BASE64 + "(xxx) or "
+ destSlotDesc.getColumn().getName() + "=hll_empty()");
}
expr.setType(org.apache.doris.catalog.Type.HLL);
Expand Down
12 changes: 12 additions & 0 deletions regression-test/data/load_p0/http_stream/test_http_stream.out
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,15 @@
1 test
2 test

-- !sql19 --
buag 1 1
huang 1 1
jfin 1 1
koga 1 1
kon 1 1
lofn 1 1
lojn 1 1
nfubg 1 1
nhga 1 1
nijg 1 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
1001,koga,AQEMYSmSmfh+mA==
1002,nijg,AQGs1RXTaA+hkQ==
1003,lojn,AQFyJr4rwn+S0A==
1004,lofn,AQFvE0bU6Pc9uw==
1005,jfin,AQEmxbO3VGItCA==
1006,kon,AQEm5d0Gw4uvZw==
1007,nhga,AQHOpocenFnBwQ==
1008,nfubg,AQFzYsFz+NIgUg==
1009,huang,AQH2slI7qAUmYA==
1010,buag,AQGBXZ3xnU79YA==
12 changes: 12 additions & 0 deletions regression-test/data/load_p0/stream_load/test_stream_load_new.out
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,15 @@
10009 jj
10010 kk

-- !sql13 --
buag 1 1
huang 1 1
jfin 1 1
koga 1 1
kon 1 1
lofn 1 1
lojn 1 1
nfubg 1 1
nhga 1 1
nijg 1 1

41 changes: 41 additions & 0 deletions regression-test/suites/load_p0/http_stream/test_http_stream.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -854,5 +854,46 @@ suite("test_http_stream", "p0") {
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName18}"
}

// test load hll type
def tableName19 = "test_http_stream_hll_type"

try {
sql """
CREATE TABLE IF NOT EXISTS ${tableName19} (
type_id int,
type_name varchar(10),
pv_hash hll hll_union not null,
pv_base64 hll hll_union not null
)
AGGREGATE KEY(type_id,type_name)
DISTRIBUTED BY HASH(type_id) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)
"""

streamLoad {
set 'version', '1'
set 'sql', """
insert into ${db}.${tableName19} select c1,c2,hll_hash(c1),hll_from_base64(c3) from http_stream("format"="csv", "column_separator"=",")
"""
time 10000
file '../stream_load/test_stream_load_hll_type.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("http_stream result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
}
}

qt_sql19 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName19} group by type_name order by type_name"
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName19}"
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -540,5 +540,47 @@ suite("test_stream_load_new", "p0") {
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName12}"
}

// 13. test stream load hll type
def tableName13 = "test_stream_load_hll_type"

try {
sql """
CREATE TABLE IF NOT EXISTS ${tableName13} (
type_id int,
type_name varchar(10),
pv_hash hll hll_union not null,
pv_base64 hll hll_union not null
)
AGGREGATE KEY(type_id,type_name)
DISTRIBUTED BY HASH(type_id) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)
"""

streamLoad {
set 'column_separator', ','
set 'columns', 'type_id,type_name,type_id_base64,pv_hash=hll_hash(type_id),pv_base64=hll_from_base64(type_id_base64)'
table "${tableName13}"
time 10000
file 'test_stream_load_hll_type.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(10, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
sql """ sync; """
qt_sql13 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName13} group by type_name order by type_name"
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName13}"
}

}

Loading