diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index dc6358d3deca3c..7b7070c89f7b9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -95,6 +95,7 @@ public enum PlanType { // logical sinks PHYSICAL_FILE_SINK, PHYSICAL_OLAP_TABLE_SINK, + PHYSICAL_HIVE_TABLE_SINK, PHYSICAL_RESULT_SINK, // logical others diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java new file mode 100644 index 00000000000000..019ea0a83fa720 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for olap table + */ +public class HiveInsertExecutor extends AbstractInsertExecutor { + private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class); + private static final long INVALID_TXN_ID = -1L; + private long txnId = INVALID_TXN_ID; + private TransactionStatus txnStatus = TransactionStatus.ABORTED; + + /** + * constructor + */ + public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table, + String labelName, NereidsPlanner planner, + Optional insertCtx) { + super(ctx, table, labelName, planner, insertCtx); + } + + public long getTxnId() { + return txnId; + } + + @Override + public void beginTransaction() { + + } + + @Override + protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { + + } + + @Override + protected void beforeExec() { + + } + + @Override + protected void onComplete() throws UserException { + + } + + @Override + protected void onFail(Throwable t) { + + } + + @Override + protected void afterExec(StmtExecutor executor) { + + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 06cd4275682484..29d96ae4ad9c88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -23,6 +23,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.util.ProfileManager.ProfileType; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; @@ -34,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -160,6 +162,9 @@ private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Excep : false; insertExecutor.getCoordinator().getQueryOptions() .setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); + } else if (physicalSink instanceof PhysicalHiveTableSink) { + HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; + insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, insertCtx); } else { // TODO: support other table types throw new AnalysisException("insert into command only support olap table"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java new file mode 100644 index 00000000000000..149d8f037589d0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import org.jetbrains.annotations.Nullable; + +import java.util.List; +import java.util.Optional; + +/** abstract physical sink */ +public class PhysicalHiveTableSink extends PhysicalSink implements Sink { + + public PhysicalHiveTableSink(List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + @Nullable PhysicalProperties physicalProperties, + Statistics statistics, CHILD_TYPE child) { + super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, + logicalProperties, physicalProperties, statistics, child); + } + + @Override + public Plan withChildren(List children) { + return null; + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return null; + } + + @Override + public List getExpressions() { + return null; + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return null; + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return null; + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java index b813093f7823c7..c769bbea782d0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java @@ -22,8 +22,9 @@ import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.odbc.sink.OdbcTableSink; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TExplainLevel; @@ -62,11 +63,13 @@ public PlanFragment getFragment() { public abstract DataPartition getOutputPartition(); - public static DataSink createDataSink(Table table) throws AnalysisException { + public static DataSink createDataSink(TableIf table) throws AnalysisException { if (table instanceof MysqlTable) { return new MysqlTableSink((MysqlTable) table); } else if (table instanceof OdbcTable) { return new OdbcTableSink((OdbcTable) table); + } else if (table instanceof HMSExternalTable) { + return new HiveTableSink((HMSExternalTable) table); } else { throw new AnalysisException("Unknown table type " + table.getType()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java new file mode 100644 index 00000000000000..2c10fda714ee97 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/DataSink.java +// and modified by Doris + +package org.apache.doris.planner; + +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.thrift.TDataSink; +import org.apache.doris.thrift.TExplainLevel; + +public class HiveTableSink extends DataSink { + + public HiveTableSink(HMSExternalTable table) { + super(); + } + + @Override + public String getExplainString(String prefix, TExplainLevel explainLevel) { + return null; + } + + @Override + protected TDataSink toThrift() { + return null; + } + + @Override + public PlanNodeId getExchNodeId() { + return null; + } + + @Override + public DataPartition getOutputPartition() { + return null; + } +} diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 602943b4207c48..8b15585aa5d6a1 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -277,6 +277,23 @@ struct TOlapTableSink { 23: optional double max_filter_ratio } +struct TWriteLocation { + 1: optional string write_path + 2: optional string target_path +} + +struct THiveTableSink { + 1: optional string db_name + 2: optional string table_name + 3: optional list data_column_names + 4: optional list partition_column_names + 5: optional list partitions + 6: optional list buckets + 7: optional PlanNodes.TFileFormatType file_format + 8: optional Types.TCompressionType compression_type + 9: optional TWriteLocation location +} + struct TDataSink { 1: required TDataSinkType type 2: optional TDataStreamSink stream_sink @@ -289,5 +306,20 @@ struct TDataSink { 10: optional TResultFileSink result_file_sink 11: optional TJdbcTableSink jdbc_table_sink 12: optional TMultiCastDataStreamSink multi_cast_stream_sink + 13: optional THiveTableSink hive_table_sink } +struct THivePartitionUpdate { + 1: optional string name + 2: optional TUpdateMode update_mode + 3: optional TWriteLocation location + 4: optional list file_names + 5: optional i64 row_count + 6: optional i64 file_size +} + +enum TUpdateMode { + NEW = 0, // add partition + APPEND = 1, // alter partition + OVERWRITE = 2 // insert overwrite +} diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift index 0a7e70c0a4f2a2..0c69f01e259a3f 100644 --- a/gensrc/thrift/Partitions.thrift +++ b/gensrc/thrift/Partitions.thrift @@ -20,6 +20,8 @@ namespace java org.apache.doris.thrift include "Exprs.thrift" include "Types.thrift" +include "PlanNodes.thrift" +include "DataSinks.thrift" enum TPartitionType { UNPARTITIONED, @@ -92,4 +94,21 @@ struct TDataPartition { 3: optional list partition_infos } +struct TWritePartition { + 1: optional list values + 2: optional DataSinks.TWriteLocation location + 3: optional PlanNodes.TFileFormatType file_format +} + +struct TSortedColumn { + 1: optional string sort_column_name + 2: optional i32 order // asc(1) or desc(0) +} + +struct TWriteBucket { + 1: optional list bucketed_by + 2: optional i32 bucket_version + 3: optional i32 bucket_count + 4: optional list sorted_by +} diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index b6ec18d8197ee5..e5ede58b3078b2 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -728,6 +728,18 @@ struct TUserIdentity { 3: optional bool is_domain } +enum TCompressionType { + UNKNOWN_COMPRESSION = 0, + DEFAULT_COMPRESSION = 1, + NO_COMPRESSION = 2, + SNAPPY = 3, + LZ4 = 4, + LZ4F = 5, + ZLIB = 6, + ZSTD = 7, + LZ4HC = 8 +} + const i32 TSNAPSHOT_REQ_VERSION1 = 3; // corresponding to alpha rowset const i32 TSNAPSHOT_REQ_VERSION2 = 4; // corresponding to beta rowset // the snapshot request should always set prefer snapshot version to TPREFER_SNAPSHOT_REQ_VERSION