Skip to content

Commit

Permalink
[featurl](insert)add hive table sink definition
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz committed Mar 1, 2024
1 parent 19c98d5 commit ccdb211
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public enum PlanType {
// logical sinks
PHYSICAL_FILE_SINK,
PHYSICAL_OLAP_TABLE_SINK,
PHYSICAL_HIVE_TABLE_SINK,
PHYSICAL_RESULT_SINK,

// logical others
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.transaction.TransactionStatus;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Optional;

/**
* Insert executor for olap table
*/
public class HiveInsertExecutor extends AbstractInsertExecutor {
private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
private long txnId = INVALID_TXN_ID;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;

/**
* constructor
*/
public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
}

public long getTxnId() {
return txnId;
}

@Override
public void beginTransaction() {

}

@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {

}

@Override
protected void beforeExec() {

}

@Override
protected void onComplete() throws UserException {

}

@Override
protected void onFail(Throwable t) {

}

@Override
protected void afterExec(StmtExecutor executor) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
Expand All @@ -34,6 +35,7 @@
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand Down Expand Up @@ -160,6 +162,9 @@ private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Excep
: false;
insertExecutor.getCoordinator().getQueryOptions()
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
} else if (physicalSink instanceof PhysicalHiveTableSink) {
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, insertCtx);
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support olap table");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.physical;

import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.statistics.Statistics;

import org.jetbrains.annotations.Nullable;

import java.util.List;
import java.util.Optional;

/** abstract physical sink */
public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalSink<CHILD_TYPE> implements Sink {

public PhysicalHiveTableSink(List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
@Nullable PhysicalProperties physicalProperties,
Statistics statistics, CHILD_TYPE child) {
super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression,
logicalProperties, physicalProperties, statistics, child);
}

@Override
public Plan withChildren(List<Plan> children) {
return null;
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return null;
}

@Override
public List<? extends Expression> getExpressions() {
return null;
}

@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return null;
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return null;
}

@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.odbc.sink.OdbcTableSink;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TExplainLevel;
Expand Down Expand Up @@ -62,11 +63,13 @@ public PlanFragment getFragment() {

public abstract DataPartition getOutputPartition();

public static DataSink createDataSink(Table table) throws AnalysisException {
public static DataSink createDataSink(TableIf table) throws AnalysisException {
if (table instanceof MysqlTable) {
return new MysqlTableSink((MysqlTable) table);
} else if (table instanceof OdbcTable) {
return new OdbcTableSink((OdbcTable) table);
} else if (table instanceof HMSExternalTable) {
return new HiveTableSink((HMSExternalTable) table);
} else {
throw new AnalysisException("Unknown table type " + table.getType());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/DataSink.java
// and modified by Doris

package org.apache.doris.planner;

import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TExplainLevel;

public class HiveTableSink extends DataSink {

public HiveTableSink(HMSExternalTable table) {
super();
}

@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
return null;
}

@Override
protected TDataSink toThrift() {
return null;
}

@Override
public PlanNodeId getExchNodeId() {
return null;
}

@Override
public DataPartition getOutputPartition() {
return null;
}
}
32 changes: 32 additions & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,23 @@ struct TOlapTableSink {
23: optional double max_filter_ratio
}

struct TWriteLocation {
1: optional string write_path
2: optional string target_path
}

struct THiveTableSink {
1: optional string db_name
2: optional string table_name
3: optional list<string> data_column_names
4: optional list<string> partition_column_names
5: optional list<Partitions.TWritePartition> partitions
6: optional list<Partitions.TWriteBucket> buckets
7: optional PlanNodes.TFileFormatType file_format
8: optional Types.TCompressionType compression_type
9: optional TWriteLocation location
}

struct TDataSink {
1: required TDataSinkType type
2: optional TDataStreamSink stream_sink
Expand All @@ -289,5 +306,20 @@ struct TDataSink {
10: optional TResultFileSink result_file_sink
11: optional TJdbcTableSink jdbc_table_sink
12: optional TMultiCastDataStreamSink multi_cast_stream_sink
13: optional THiveTableSink hive_table_sink
}

struct THivePartitionUpdate {
1: optional string name
2: optional TUpdateMode update_mode
3: optional TWriteLocation location
4: optional list<string> file_names
5: optional i64 row_count
6: optional i64 file_size
}

enum TUpdateMode {
NEW = 0, // add partition
APPEND = 1, // alter partition
OVERWRITE = 2 // insert overwrite
}
19 changes: 19 additions & 0 deletions gensrc/thrift/Partitions.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ namespace java org.apache.doris.thrift

include "Exprs.thrift"
include "Types.thrift"
include "PlanNodes.thrift"
include "DataSinks.thrift"

enum TPartitionType {
UNPARTITIONED,
Expand Down Expand Up @@ -92,4 +94,21 @@ struct TDataPartition {
3: optional list<TRangePartition> partition_infos
}

struct TWritePartition {
1: optional list<string> values
2: optional DataSinks.TWriteLocation location
3: optional PlanNodes.TFileFormatType file_format
}

struct TSortedColumn {
1: optional string sort_column_name
2: optional i32 order // asc(1) or desc(0)
}

struct TWriteBucket {
1: optional list<string> bucketed_by
2: optional i32 bucket_version
3: optional i32 bucket_count
4: optional list<TSortedColumn> sorted_by
}

12 changes: 12 additions & 0 deletions gensrc/thrift/Types.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,18 @@ struct TUserIdentity {
3: optional bool is_domain
}

enum TCompressionType {
UNKNOWN_COMPRESSION = 0,
DEFAULT_COMPRESSION = 1,
NO_COMPRESSION = 2,
SNAPPY = 3,
LZ4 = 4,
LZ4F = 5,
ZLIB = 6,
ZSTD = 7,
LZ4HC = 8
}

const i32 TSNAPSHOT_REQ_VERSION1 = 3; // corresponding to alpha rowset
const i32 TSNAPSHOT_REQ_VERSION2 = 4; // corresponding to beta rowset
// the snapshot request should always set prefer snapshot version to TPREFER_SNAPSHOT_REQ_VERSION
Expand Down

0 comments on commit ccdb211

Please sign in to comment.