Skip to content

Commit

Permalink
[feature](selectdb-cloud) Copy into support select by column name (ap…
Browse files Browse the repository at this point in the history
…ache#1055)

* Copy into support select by column name
* Fix broker load core dump due to mis-match of number of columns between remote and schema
  • Loading branch information
liaoxin01 authored and gavinchou committed Nov 9, 2022
1 parent 7900250 commit f1a543e
Show file tree
Hide file tree
Showing 114 changed files with 1,362 additions and 10 deletions.
6 changes: 6 additions & 0 deletions be/src/vec/exec/varrow_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "common/status.h"
#include "exec/arrow/parquet_reader.h"
#include "exprs/expr.h"
#include "io/file_factory.h"
Expand Down Expand Up @@ -153,6 +154,11 @@ Status VArrowScanner::_init_arrow_batch_if_necessary() {
Status VArrowScanner::_init_src_block() {
size_t batch_pos = 0;
_src_block.clear();
if (_batch->num_columns() < _num_of_columns_from_file) {
LOG(WARNING) << "some cloumns not found in the file, num_columns_obtained: "
<< _batch->num_columns() << " num_columns_required: " << _num_of_columns_from_file;
return Status::InvalidArgument("some cloumns not found in the file");
}
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor* slot_desc = _src_slot_descs[i];
if (slot_desc == nullptr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class CopyFromParam {
private static final Logger LOG = LogManager.getLogger(CopyFromParam.class);
Expand Down Expand Up @@ -72,17 +74,18 @@ public void analyze(String fullDbName, TableName tableName) throws AnalysisExcep
OlapTable olapTable = db.getOlapTableOrAnalysisException(tableName.getTbl());
List<Column> tableColumns = olapTable.getBaseSchema();

int maxFileColumnId = getMaxFileColumnId();
maxFileColumnId = tableColumns.size() > maxFileColumnId ? tableColumns.size() : maxFileColumnId;
for (int i = 1; i <= maxFileColumnId; i++) {
fileColumns.add(DOLLAR + i);
if (!getFileColumnNames()) {
int maxFileColumnId = getMaxFileColumnId();
if (maxFileColumnId > 0) {
maxFileColumnId = tableColumns.size() > maxFileColumnId ? tableColumns.size() : maxFileColumnId;
for (int i = 1; i <= maxFileColumnId; i++) {
fileColumns.add(DOLLAR + i);
}
}
}

if (exprList != null) {
if (tableColumns.size() > exprList.size()) {
throw new AnalysisException("select column size is less than table column size");
}
for (int i = 0; i < exprList.size(); i++) {
for (int i = 0; i < exprList.size() && i < tableColumns.size(); i++) {
Expr expr = exprList.get(i);
String name = tableColumns.get(i).getName();
BinaryPredicate binaryPredicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, name), expr);
Expand All @@ -98,6 +101,27 @@ public void analyze(String fullDbName, TableName tableName) throws AnalysisExcep
}
}

// expr use column name, not use $
private boolean getFileColumnNames() throws AnalysisException {
List<SlotRef> slotRefs = Lists.newArrayList();
Expr.collectList(exprList, SlotRef.class, slotRefs);
Set<String> columnSet = new HashSet<String>();
for (SlotRef slotRef : slotRefs) {
String columnName = slotRef.getColumnName();
if (columnName.startsWith(DOLLAR)) {
if (fileColumns.size() > 0) {
throw new AnalysisException("can not mix column name and $");
}
return false;
}
if (!columnSet.contains(columnName)) {
columnSet.add(columnName);
fileColumns.add(columnName);
}
}
return true;
}

private int getMaxFileColumnId() throws AnalysisException {
int maxId = 0;
if (exprList != null) {
Expand Down Expand Up @@ -125,7 +149,10 @@ private int getMaxFileColumnId(List<Expr> exprList) throws AnalysisException {
private int getFileColumnIdOfSlotRef(SlotRef slotRef) throws AnalysisException {
String columnName = slotRef.getColumnName();
try {
return columnName.startsWith(DOLLAR) ? Integer.parseInt(columnName.substring(1)) : 0;
if (!columnName.startsWith(DOLLAR)) {
throw new AnalysisException("can not mix column name and $");
}
return Integer.parseInt(columnName.substring(1));
} catch (NumberFormatException e) {
throw new AnalysisException("column name: " + columnName + " can not parse to a number");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,36 @@ suite("test_copy_with_select") {
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}
}

try {
sql """ DROP TABLE IF EXISTS ${tableName}; """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
C_CUSTKEY INTEGER NOT NULL,
C_NAME VARCHAR(40) NOT NULL,
C_ADDRESS VARCHAR(40) NOT NULL,
C_NATIONKEY INTEGER NOT NULL,
C_PHONE CHAR(15) NULL,
C_ACCTBAL DECIMAL(15,2) NOT NULL,
C_MKTSEGMENT CHAR(10) NOT NULL,
C_COMMENT VARCHAR(117) NULL
)
UNIQUE KEY(C_CUSTKEY)
DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
"""

def mixSelect = 'select col1, col2, $1, $2, $3, $4, $5, $6' + sql_stage
test {
sql """ ${sql_prefix} ${mixSelect} ${sql_postfix} """
exception 'errCode = 2, detailMessage = can not mix column name and $'
}

mixSelect = 'select $1, $2, $3, $4, $5, $6, col1, col2' + sql_stage
test {
sql """ ${sql_prefix} ${mixSelect} ${sql_postfix} """
exception 'errCode = 2, detailMessage = can not mix column name and $'
}
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}
14 changes: 14 additions & 0 deletions regression-test/suites/cloud/copy_into_json/ddl/filter_create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS filter (
p_partkey int NOT NULL,
p_name VARCHAR(55) NOT NULL,
p_mfgr VARCHAR(25) NOT NULL,
p_brand VARCHAR(10) NOT NULL,
p_type VARCHAR(25) NOT NULL,
p_size int NOT NULL,
p_container VARCHAR(10) NOT NULL,
p_retailprice decimal(15, 2) NOT NULL,
p_comment VARCHAR(23) NOT NULL
)ENGINE=OLAP
DUPLICATE KEY(`p_partkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS filter;
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE IF NOT EXISTS null_default (
p_partkey int NOT NULL,
p_name VARCHAR(55) NOT NULL,
p_mfgr VARCHAR(25) NOT NULL,
p_brand VARCHAR(10) NOT NULL,
p_type VARCHAR(25) NOT NULL,
p_size int NOT NULL,
p_container VARCHAR(10) NOT NULL,
p_retailprice decimal(15, 2) NOT NULL,
p_comment VARCHAR(23) NOT NULL,
p_add_col1 varchar(5) null,
p_add_col2 int null,
p_add_col3 int null default '100',
p_add_col4 varchar(5) null default 'abc'
)ENGINE=OLAP
DUPLICATE KEY(`p_partkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS null_default;
12 changes: 12 additions & 0 deletions regression-test/suites/cloud/copy_into_json/ddl/part_create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS part (
`p_partkey` integer NOT NULL,
`p_name` varchar(55) NOT NULL,
`p_mfgr` char(25) NOT NULL,
`p_brand` char(10) NOT NULL,
`p_type` varchar(25) NOT NULL,
`p_size` integer NOT NULL,
`p_container` char(10) NOT NULL,
`p_retailprice` double NOT NULL,
`p_comment` varchar(23) NOT NULL
)ENGINE=OLAP
DISTRIBUTED BY HASH(p_partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS part;
12 changes: 12 additions & 0 deletions regression-test/suites/cloud/copy_into_json/ddl/reverse_create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS reverse (
`comment` varchar(23) NOT NULL,
`retailprice` decimal(12, 2) NOT NULL,
`container` char(10) NOT NULL,
`size` integer NOT NULL,
`type` varchar(25) NOT NULL,
`brand` char(10) NOT NULL,
`mfgr` char(25) NOT NULL,
`name` varchar(55) NOT NULL,
`partkey` integer NOT NULL
)ENGINE=OLAP
DISTRIBUTED BY HASH(partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS reverse;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE s3_case1 (
p_partkey int NOT NULL DEFAULT "1",
p_name VARCHAR(55) NOT NULL DEFAULT "2",
p_mfgr VARCHAR(25) NOT NULL DEFAULT "3",
p_brand VARCHAR(10) NOT NULL DEFAULT "4",
p_type VARCHAR(25) NOT NULL DEFAULT "5",
p_size int NOT NULL DEFAULT "6",
p_container VARCHAR(10) NOT NULL DEFAULT "7",
p_retailprice decimal(15, 2) NOT NULL DEFAULT "8",
p_comment VARCHAR(23) NOT NULL DEFAULT "9",
col1 int not null default "10"
)ENGINE=OLAP
DUPLICATE KEY(`p_partkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS s3_case1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE s3_case2 (
p_partkey int NOT NULL DEFAULT "1",
p_name VARCHAR(55) NOT NULL DEFAULT "2",
p_mfgr VARCHAR(25) NOT NULL DEFAULT "3",
p_brand VARCHAR(10) NOT NULL DEFAULT "4",
p_type VARCHAR(25) NOT NULL DEFAULT "5",
p_size int NOT NULL DEFAULT "6",
p_container VARCHAR(10) NOT NULL DEFAULT "7",
p_retailprice decimal(15, 2) NOT NULL DEFAULT "8",
p_comment VARCHAR(23) NOT NULL DEFAULT "9",
col1 int not null default "10"
)ENGINE=OLAP
DUPLICATE KEY(`p_partkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS s3_case2;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE s3_case3 (
p_partkey int NOT NULL DEFAULT "1",
p_name VARCHAR(55) NOT NULL DEFAULT "2",
p_mfgr VARCHAR(25) NOT NULL DEFAULT "3",
p_brand VARCHAR(10) NOT NULL DEFAULT "4",
p_type VARCHAR(25) NOT NULL DEFAULT "5",
p_size int NOT NULL DEFAULT "6",
p_container VARCHAR(10) NOT NULL DEFAULT "7",
p_retailprice decimal(15, 2) NOT NULL DEFAULT "8"
)ENGINE=OLAP
DUPLICATE KEY(`p_partkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS s3_case3;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE s3_case4 (
col1 int NOT NULL DEFAULT "1",
col2 VARCHAR(55) NOT NULL DEFAULT "2",
col3 VARCHAR(25) NOT NULL DEFAULT "3",
col4 VARCHAR(10) NOT NULL DEFAULT "4"
)ENGINE=OLAP
DUPLICATE KEY(`col1`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`col1`) BUCKETS 3;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS s3_case4;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE s3_case5 (
p_partkey int NOT NULL DEFAULT "1",
p_name VARCHAR(55) NOT NULL DEFAULT "2",
p_mfgr VARCHAR(25) NOT NULL DEFAULT "3",
col4 VARCHAR(10) NOT NULL DEFAULT "4"
)ENGINE=OLAP
DUPLICATE KEY(`p_partkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS s3_case5;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE s3_case6 (
p_partkey int NOT NULL DEFAULT "1",
p_name VARCHAR(55) NOT NULL DEFAULT "2",
p_mfgr VARCHAR(25) NOT NULL DEFAULT "3",
col4 VARCHAR(10) NOT NULL DEFAULT "4"
)ENGINE=OLAP
DUPLICATE KEY(`p_partkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS s3_case6;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS set1 (
`p_partkey` integer NOT NULL default '0',
`p_name` varchar(55) NOT NULL default 'aaa',
`p_size` integer NOT NULL default '0',
`p_greatest` integer NOT NULL
)ENGINE=OLAP
DISTRIBUTED BY HASH(p_partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS set1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS set2 (
`p_partkey` integer NOT NULL default '0'
)ENGINE=OLAP
DISTRIBUTED BY HASH(p_partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS set2;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS set3 (
`partkey` integer NOT NULL default '0'
)ENGINE=OLAP
DISTRIBUTED BY HASH(partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS set3;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS set4 (
`partkey` integer NOT NULL default '0'
)ENGINE=OLAP
DISTRIBUTED BY HASH(partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS set4;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS set5 (
`partkey` integer NOT NULL default '0'
)ENGINE=OLAP
DISTRIBUTED BY HASH(partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS set5;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS set6 (
`partkey` integer NOT NULL default '0',
`partsize` integer NOT NULL default '0'
)ENGINE=OLAP
DISTRIBUTED BY HASH(partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS set6;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS set7 (
`p_partkey` integer NOT NULL default '0',
`partsize` integer NOT NULL default '0'
)ENGINE=OLAP
DISTRIBUTED BY HASH(p_partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS set7;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS upper_case (
`P_PARTKEY` integer NOT NULL,
`P_NAME` varchar(55) NOT NULL,
`P_MFGR` char(25) NOT NULL,
`P_BRAND` char(10) NOT NULL,
`P_TYPE` varchar(25) NOT NULL,
`P_SIZE` integer NOT NULL,
`P_CONTAINER` char(10) NOT NULL,
`P_RETAILPRICE` decimal(12, 2) NOT NULL,
`P_COMMENT` varchar(23) NOT NULL
)ENGINE=OLAP
DISTRIBUTED BY HASH(p_partkey) BUCKETS 24;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS upper_case;
Loading

0 comments on commit f1a543e

Please sign in to comment.