Skip to content

Commit

Permalink
[feature](insert)add hive table sink definition (#31662) (#32347)
Browse files Browse the repository at this point in the history
bp #31662
Co-authored-by: slothever <[email protected]>
  • Loading branch information
morningman authored Mar 17, 2024
1 parent 4732aae commit 1645f2e
Show file tree
Hide file tree
Showing 16 changed files with 617 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
Expand Down Expand Up @@ -427,6 +428,13 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends P
return rootFragment;
}

@Override
public PlanFragment visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> hiveTableSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = hiveTableSink.child().accept(this, context);
return rootFragment;
}

@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
PlanTranslatorContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
Expand Down Expand Up @@ -52,6 +53,12 @@ public Plan visitLogicalOlapTableSink(LogicalOlapTableSink<? extends Plan> table
return tableSink;
}

@Override
public Plan visitLogicalHiveTableSink(LogicalHiveTableSink<? extends Plan> tableSink, StatementContext context) {
turnOffPageCache(context);
return tableSink;
}

private void turnOffPageCache(StatementContext context) {
SessionVariable sessionVariable = context.getConnectContext().getSessionVariable();
// set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFileSink;
import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
import org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
Expand Down Expand Up @@ -187,6 +188,7 @@ public class RuleSet {
.add(new LogicalIntersectToPhysicalIntersect())
.add(new LogicalGenerateToPhysicalGenerate())
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum RuleType {

// **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. ****
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_EXTERNAL_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
Expand Down Expand Up @@ -386,6 +387,7 @@ public enum RuleType {
LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import java.util.stream.Collectors;

/**
* bind an unbound logicalOlapTableSink represent the target table of an insert command
* bind an unbound logicalTableSink represent the target table of an insert command
*/
public class BindSink implements AnalysisRuleFactory {

Expand Down Expand Up @@ -340,6 +340,14 @@ public List<Rule> buildRules() {
fileSink.child().getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList())))
),
// TODO: bind hive taget table
RuleType.BINDING_INSERT_TARGET_EXTERNAL_TABLE.build(
logicalHiveTableSink().when(s -> s.getOutputExprs().isEmpty())
.then(hiveTableSink -> hiveTableSink.withOutputExprs(
hiveTableSink.child().getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList())))
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.implementation;

import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;

import java.util.Optional;

/**
* Implementation rule that convert logical HiveTableSink to physical HiveTableSink.
*/
public class LogicalHiveTableSinkToPhysicalHiveTableSink extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalHiveTableSink().thenApply(ctx -> {
LogicalHiveTableSink<? extends Plan> sink = ctx.root;
return new PhysicalHiveTableSink<>(
sink.getDatabase(),
sink.getTargetTable(),
sink.getCols(),
sink.getPartitionIds(),
sink.getOutputExprs(),
Optional.empty(),
sink.getLogicalProperties(),
null,
null,
sink.child());
}).toRule(RuleType.LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public enum PlanType {
LOGICAL_WINDOW,

// physical plans
// logical relations
// physical relations
PHYSICAL_CTE_CONSUMER,
PHYSICAL_EMPTY_RELATION,
PHYSICAL_ES_SCAN,
Expand All @@ -92,12 +92,13 @@ public enum PlanType {
PHYSICAL_SCHEMA_SCAN,
PHYSICAL_TVF_RELATION,

// logical sinks
// physical sinks
PHYSICAL_FILE_SINK,
PHYSICAL_OLAP_TABLE_SINK,
PHYSICAL_HIVE_TABLE_SINK,
PHYSICAL_RESULT_SINK,

// logical others
// physical others
PHYSICAL_HASH_AGGREGATE,
PHYSICAL_ASSERT_NUM_ROWS,
PHYSICAL_CTE_PRODUCER,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.HiveTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.transaction.TransactionState;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Optional;

/**
* Insert executor for olap table
*/
public class HiveInsertExecutor extends AbstractInsertExecutor {
private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
private long txnId = INVALID_TXN_ID;

/**
* constructor
*/
public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
}

public long getTxnId() {
return txnId;
}

@Override
public void beginTransaction() {

}

@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
HiveTableSink hiveTableSink = (HiveTableSink) sink;
// PhysicalHiveTableSink physicalHiveTableSink = (PhysicalHiveTableSink) physicalSink;
try {
hiveTableSink.init();
hiveTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
}

@Override
protected void beforeExec() {

}

@Override
protected void onComplete() throws UserException {

}

@Override
protected void onFail(Throwable t) {

}

@Override
protected void afterExec(StmtExecutor executor) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
Expand All @@ -34,6 +35,7 @@
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand Down Expand Up @@ -160,6 +162,9 @@ private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Excep
: false;
insertExecutor.getCoordinator().getQueryOptions()
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
} else if (physicalSink instanceof PhysicalHiveTableSink) {
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, insertCtx);
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support olap table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
Expand Down Expand Up @@ -252,7 +253,13 @@ public static Plan normalizePlan(Plan plan, TableIf table) {
}
}
}

if (table instanceof HMSExternalTable) {
// TODO: check HMSExternalTable
HMSExternalTable hiveTable = (HMSExternalTable) table;
if (hiveTable.isView()) {
throw new AnalysisException("View is not support in hive external table.");
}
}
Plan query = unboundTableSink.child();
if (!(query instanceof LogicalInlineTable)) {
return plan;
Expand Down
Loading

0 comments on commit 1645f2e

Please sign in to comment.