diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp index 8b389d5a95f7a3..f79c6cb3a9fa2e 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/exec/broker_reader.cpp @@ -94,6 +94,7 @@ Status BrokerReader::open() { try { client->openReader(response, request); } catch (apache::thrift::transport::TTransportException& e) { + usleep(1000 * 1000); RETURN_IF_ERROR(client.reopen()); client->openReader(response, request); } @@ -143,6 +144,7 @@ Status BrokerReader::read(uint8_t* buf, size_t* buf_len, bool* eof) { try { client->pread(response, request); } catch (apache::thrift::transport::TTransportException& e) { + usleep(1000 * 1000); RETURN_IF_ERROR(client.reopen()); LOG(INFO) << "retry reading from broker: " << broker_addr << ". reason: " << e.what(); client->pread(response, request); @@ -197,6 +199,7 @@ void BrokerReader::close() { try { client->closeReader(response, request); } catch (apache::thrift::transport::TTransportException& e) { + usleep(1000 * 1000); status = client.reopen(); if (!status.ok()) { LOG(WARNING) << "Close broker reader failed. broker=" << broker_addr diff --git a/docs/documentation/cn/sql-reference/sql-statements/insert.md b/docs/documentation/cn/sql-reference/sql-statements/insert.md index cce2f194a1f416..b610e475f2b189 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/insert.md +++ b/docs/documentation/cn/sql-reference/sql-statements/insert.md @@ -4,6 +4,7 @@ ``` INSERT INTO table_name + [ PARTITION (, ...) ] [ (column [, ...]) ] [ \[ hint [, ...] \] ] { VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query } @@ -13,6 +14,8 @@ INSERT INTO table_name INSERT 向一张表里插入数据。用户可以通过 VALUES 语法插入一条或者多条数据,或者通过一个查询来插入0条或者多条数据。 +partition是目标分区,如果指定目标分区,则只会导入符合目标分区的数据。如果没有指定,则默认值为这张表的所有分区。 + column是目标列,可以以任意的顺序存在。如果没有指定目标列,那么默认值是这张表的所有列。 如果表中的某个列没有存在目标列中,那么这个列需要有默认值,否则 INSERT 就会执行失败。 @@ -23,6 +26,8 @@ column是目标列,可以以任意的顺序存在。如果没有指定目标 > tablet_name: 导入数据的目的表。可以是 `db_name.table_name` 形式 > +> partition_names: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔 +> > column_name: 指定的目的列,必须是 `table_name` 中存在的列 > > expression: 需要赋值给某个列的对应表达式 @@ -31,7 +36,8 @@ column是目标列,可以以任意的顺序存在。如果没有指定目标 > > query: 一个普通查询,查询的结果会写入到目标中 > -> hint: 用于指示`INSERT`执行行为的一些指示符。`streaming`,用于指示使用同步方式来完成`INSERT`语句执行。 +> hint: 用于指示 `INSERT` 执行行为的一些指示符。`streaming` 和 默认的非 `streaming` 方式均会使用同步方式完成 `INSERT` 语句执行 +> 非 `streaming` 方式在执行完成后会返回一个 label 方便用户通过 `SHOW LOAD` 查询导入的状态 ## Note @@ -72,8 +78,6 @@ INSERT INTO test [streaming] SELECT * FROM test2 INSERT INTO test (c1, c2) [streaming] SELECT * from test2 ``` -为了兼容的问题,默认的insert方式是异步完成的,效率比较差。如果需要使用效率比较高的导入方式,需要加上`[streaming]`来使用同步导入方式。 - 4. 向`test`表中异步的导入一个查询语句结果 ``` @@ -81,4 +85,6 @@ INSERT INTO test SELECT * FROM test2 INSERT INTO test (c1, c2) SELECT * from test2 ``` -由于Doris之前的导入方式都是异步导入方式,这个导入语句会返回一个导入作业的`label`,用户需要通过`SHOW LOAD`命令查看此`label`导入作业的运行状况,当状态为`FINISHED`时,导入数据才生效。 +异步的导入其实是,一个同步的导入封装成了异步。填写 streaming 和不填写的*执行效率是一样*的。 + +由于Doris之前的导入方式都是异步导入方式,为了兼容旧有的使用习惯,不加 streaming 的 `INSERT` 语句依旧会返回一个 label,用户需要通过`SHOW LOAD`命令查看此`label`导入作业的状态。 diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java index 851f80a8028481..cb219c6869abec 100644 --- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -43,6 +43,7 @@ import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -57,7 +58,22 @@ import java.util.Set; import java.util.UUID; -// InsertStmt used to +/** + * Insert into is performed to load data from the result of query stmt. + * + * syntax: + * INSERT INTO table_name [partition_info] [col_list] [plan_hints] query_stmt + * + * table_name: is the name of target table + * partition_info: PARTITION (p1,p2) + * the partition info of target table + * col_list: (c1,c2) + * the column list of target table + * plan_hints: [STREAMING,SHUFFLE_HINT] + * The streaming plan is used by both streaming and non-streaming insert stmt. + * The only difference is that non-streaming will record the load info in LoadManager and return label. + * User can check the load info by show load stmt. + */ public class InsertStmt extends DdlStmt { private static final Logger LOG = LogManager.getLogger(InsertStmt.class); @@ -239,16 +255,13 @@ public void analyze(Analyzer analyzer) throws UserException { // if all previous job finished UUID uuid = UUID.randomUUID(); String jobLabel = "insert_" + uuid; - LoadJobSourceType sourceType = isStreaming ? LoadJobSourceType.INSERT_STREAMING - : LoadJobSourceType.FRONTEND; + LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond); - if (isStreaming) { - OlapTableSink sink = (OlapTableSink) dataSink; - TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - sink.init(loadId, transactionId, db.getId()); - } + OlapTableSink sink = (OlapTableSink) dataSink; + TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + sink.init(loadId, transactionId, db.getId()); } } @@ -498,13 +511,9 @@ public DataSink createDataSink() throws AnalysisException { return dataSink; } if (targetTable instanceof OlapTable) { - if (isStreaming) { - dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple); - dataPartition = dataSink.getOutputPartition(); - } else { - dataSink = new DataSplitSink((OlapTable) targetTable, olapTuple); - dataPartition = dataSink.getOutputPartition(); - } + String partitionNames = targetPartitions == null ? null : Joiner.on(",").join(targetPartitions); + dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, partitionNames); + dataPartition = dataSink.getOutputPartition(); } else if (targetTable instanceof BrokerTable) { BrokerTable table = (BrokerTable) targetTable; // TODO(lingbin): think use which one if have more than one path @@ -525,7 +534,7 @@ public DataSink createDataSink() throws AnalysisException { } public void finalize() throws UserException { - if (isStreaming) { + if (targetTable instanceof OlapTable) { ((OlapTableSink) dataSink).finalize(); } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 6c5b7c927b638e..4b25bd1c3ff9be 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1751,9 +1751,9 @@ public long loadRoutineLoadJobs(DataInputStream dis, long checksum) throws IOExc } public long loadLoadJobsV2(DataInputStream in, long checksum) throws IOException { -// if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_50) { -// Catalog.getCurrentCatalog().getLoadManager().readFields(in); -// } + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_50) { + Catalog.getCurrentCatalog().getLoadManager().readFields(in); + } return checksum; } @@ -2057,7 +2057,7 @@ public void replayGlobalVariable(SessionVariable variable) throws IOException, D } public long saveLoadJobsV2(DataOutputStream out, long checksum) throws IOException { -// Catalog.getCurrentCatalog().getLoadManager().write(out); + Catalog.getCurrentCatalog().getLoadManager().write(out); return checksum; } diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java index 21c40ea14fd31f..1ab1cf2db56e34 100644 --- a/fe/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java @@ -35,5 +35,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_49; + public static int meta_version = FeMetaVersion.VERSION_50; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 1c13125a04c86b..44db891d01f499 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -109,5 +109,5 @@ public final class FeMetaVersion { // routine load job public static final int VERSION_49 = 49; // load job v2 for broker load -// public static final int VERSION_50 = 50; + public static final int VERSION_50 = 50; } diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 948478913ff652..87911be2a55c4e 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -21,12 +21,14 @@ import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; import org.apache.doris.catalog.BrokerTable; +import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -57,7 +59,6 @@ public class BrokerFileGroup implements Writable { private String valueSeparator; private String lineDelimiter; // fileFormat may be null, which means format will be decided by file's suffix - // TODO(zc): we need to persist fileFormat, this should be done in next META_VERSION increase private String fileFormat; private boolean isNegative; private List partitionIds; @@ -264,7 +265,13 @@ public void write(DataOutput out) throws IOException { Expr.writeTo(entry.getValue(), out); } } - // + // fileFormat + if (fileFormat == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + Text.writeString(out, fileFormat); + } } @Override @@ -312,6 +319,12 @@ public void readFields(DataInput in) throws IOException { } } } + // file format + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_50) { + if (in.readBoolean()) { + fileFormat = Text.readString(in); + } + } } public static BrokerFileGroup read(DataInput in) throws IOException { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java new file mode 100644 index 00000000000000..5a62598179e331 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.Config; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.load.EtlJobType; +import org.apache.doris.load.FailMsg; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * The class is performed to record the finished info of insert load job. + * It is created after txn is visible which belongs to insert load job. + * The state of insert load job is always finished, so it will never be scheduled by JobScheduler. + */ +public class InsertLoadJob extends LoadJob { + + private long tableId; + + // only for log replay + public InsertLoadJob() { + super(); + this.jobType = EtlJobType.INSERT; + } + + public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp) { + super(dbId, label); + this.tableId = tableId; + this.createTimestamp = createTimestamp; + this.loadStartTimestamp = createTimestamp; + this.finishTimestamp = System.currentTimeMillis(); + this.state = JobState.FINISHED; + this.progress = 100; + this.jobType = EtlJobType.INSERT; + this.timeoutSecond = Config.insert_load_default_timeout_second; + } + + @Override + protected Set getTableNames() throws MetaNotFoundException { + Database database = Catalog.getCurrentCatalog().getDb(dbId); + if (database == null) { + throw new MetaNotFoundException("Database " + dbId + "has been deleted"); + } + database.readLock(); + try { + Table table = database.getTable(tableId); + if (table == null) { + throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId); + } + return new HashSet<>(Arrays.asList(table.getName())); + } finally { + database.readUnlock(); + } + } + + @Override + void executeJob() { + } + + @Override + public void onTaskFinished(TaskAttachment attachment) { + } + + @Override + public void onTaskFailed(long taskId, FailMsg failMsg) { + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeLong(tableId); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + tableId = in.readLong(); + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index b70019a316ca42..c5eac56c8b4548 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -235,7 +235,7 @@ protected static void checkDataSourceInfo(Database db, List dat } } - abstract Set getTableNames(); + abstract Set getTableNames() throws MetaNotFoundException; public void isJobTypeRead(boolean jobTypeRead) { isJobTypeRead = jobTypeRead; @@ -331,7 +331,7 @@ public void cancelJobWithoutCheck(FailMsg failMsg) { logFinalOperation(); } - public void cancelJob(FailMsg failMsg) throws DdlException { + public void cancelJob(FailMsg failMsg) throws DdlException, MetaNotFoundException { writeLock(); try { // check @@ -357,7 +357,7 @@ public void cancelJob(FailMsg failMsg) throws DdlException { logFinalOperation(); } - private void checkAuth() throws DdlException { + private void checkAuth() throws DdlException, MetaNotFoundException { Database db = Catalog.getInstance().getDb(dbId); if (db == null) { throw new DdlException("Db does not exist. id: " + dbId); @@ -467,7 +467,7 @@ public void unprotectReadEndOperation(LoadJobFinalOperation loadJobFinalOperatio failMsg = loadJobFinalOperation.getFailMsg(); } - public List getShowInfo() throws DdlException { + public List getShowInfo() throws DdlException, MetaNotFoundException { readLock(); try { // check auth @@ -537,6 +537,8 @@ public static LoadJob read(DataInput in) throws IOException { EtlJobType type = EtlJobType.valueOf(Text.readString(in)); if (type == EtlJobType.BROKER) { job = new BrokerLoadJob(); + } else if (type == EtlJobType.INSERT) { + job = new InsertLoadJob(); } else { throw new IOException("Unknown load type: " + type.name()); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 0655b71922d2d4..2613d50d7ad94e 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -27,11 +27,14 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -43,6 +46,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.Iterator; import java.util.LinkedList; @@ -87,7 +91,7 @@ public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException { throw new DdlException("LoadManager only support the broker load."); } loadJob = BrokerLoadJob.fromLoadStmt(stmt); - addLoadJob(loadJob); + createLoadJob(loadJob); // submit it loadJobScheduler.submitJob(loadJob); } finally { @@ -98,12 +102,19 @@ public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException { } public void replayCreateLoadJob(LoadJob loadJob) { - addLoadJob(loadJob); + createLoadJob(loadJob); LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) .add("msg", "replay create load job") .build()); } + private void createLoadJob(LoadJob loadJob) { + addLoadJob(loadJob); + // add callback before txn created, because callback will be performed on replay without txn begin + // register txn state listener + Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); + } + private void addLoadJob(LoadJob loadJob) { idToLoadJob.put(loadJob.getId(), loadJob); long dbId = loadJob.getDbId(); @@ -115,12 +126,31 @@ private void addLoadJob(LoadJob loadJob) { labelToLoadJobs.put(loadJob.getLabel(), new ArrayList<>()); } labelToLoadJobs.get(loadJob.getLabel()).add(loadJob); - // add callback before txn created, because callback will be performed on replay without txn begin - // register txn state listener - Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); } - public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException { + public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType, + long createTimestamp) throws MetaNotFoundException { + + // get db id + Database db = Catalog.getCurrentCatalog().getDb(dbName); + if (db == null) { + throw new MetaNotFoundException("Database[" + dbName + "] does not exist"); + } + + LoadJob loadJob; + switch (jobType) { + case INSERT: + loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp); + break; + default: + return; + } + addLoadJob(loadJob); + // persistent + Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob); + } + + public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, MetaNotFoundException { Database db = Catalog.getInstance().getDb(stmt.getDbName()); if (db == null) { throw new DdlException("Db does not exist. name: " + stmt.getDbName()); @@ -221,17 +251,23 @@ public List> getLoadJobInfosByDb(long dbId, String labelValue, try { Map> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId); List loadJobList = Lists.newArrayList(); - // check label value - if (accurateMatch) { - if (!labelToLoadJobs.containsKey(labelValue)) { - return loadJobInfos; - } - loadJobList.addAll(labelToLoadJobs.get(labelValue)); - } - // non-accurate match - for (Map.Entry> entry : labelToLoadJobs.entrySet()) { - if (entry.getKey().contains(labelValue)) { - loadJobList.addAll(entry.getValue()); + if (Strings.isNullOrEmpty(labelValue)) { + loadJobList.addAll(labelToLoadJobs.values() + .stream().flatMap(Collection::stream).collect(Collectors.toList())); + } else { + // check label value + if (accurateMatch) { + if (!labelToLoadJobs.containsKey(labelValue)) { + return loadJobInfos; + } + loadJobList.addAll(labelToLoadJobs.get(labelValue)); + } else { + // non-accurate match + for (Map.Entry> entry : labelToLoadJobs.entrySet()) { + if (entry.getKey().contains(labelValue)) { + loadJobList.addAll(entry.getValue()); + } + } } } @@ -243,7 +279,7 @@ public List> getLoadJobInfosByDb(long dbId, String labelValue, } // add load job info loadJobInfos.add(loadJob.getShowInfo()); - } catch (DdlException e) { + } catch (DdlException | MetaNotFoundException e) { continue; } } diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index fafc0912cea121..2786080b3e6dfa 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -1195,10 +1195,10 @@ public void logRemoveRoutineLoadJob(RoutineLoadOperation operation) { } public void logCreateLoadJob(org.apache.doris.load.loadv2.LoadJob loadJob) { -// logEdit(OperationType.OP_CREATE_LOAD_JOB, loadJob); + logEdit(OperationType.OP_CREATE_LOAD_JOB, loadJob); } public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) { -// logEdit(OperationType.OP_END_LOAD_JOB, loadJobEndOperation); + logEdit(OperationType.OP_END_LOAD_JOB, loadJobFinalOperation); } } diff --git a/fe/src/main/java/org/apache/doris/planner/Planner.java b/fe/src/main/java/org/apache/doris/planner/Planner.java index e1e901147ea109..857fc9e923c570 100644 --- a/fe/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/src/main/java/org/apache/doris/planner/Planner.java @@ -149,11 +149,6 @@ public void createPlanFragments(StatementBase statment, Analyzer analyzer, TQuer if (statment instanceof InsertStmt) { InsertStmt insertStmt = (InsertStmt) statment; insertStmt.prepareExpressions(); - if (insertStmt.getOlapTuple() != null && !insertStmt.isStreaming()) { - singleNodePlan = new OlapRewriteNode(plannerContext.getNextNodeId(), singleNodePlan, insertStmt); - singleNodePlan.init(analyzer); - resultExprs = insertStmt.getResultExprs(); - } } // TODO chenhao16 , no used materialization work diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index dbb49f6b57ad76..d4ad71d607c5df 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -47,6 +47,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; @@ -54,6 +55,7 @@ import org.apache.doris.common.util.ProfileManager; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.EtlJobType; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlEofPacket; import org.apache.doris.mysql.MysqlSerializer; @@ -630,22 +632,26 @@ private void handleInsertStmt() throws Exception { return; } - if (insertStmt.isStreaming()) { - Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( - insertStmt.getDbObj(), insertStmt.getTransactionId(), - TabletCommitInfo.fromThrift(coord.getCommitInfos()), - 5000); - context.getState().setOk(); - } else { - context.getCatalog().getLoadInstance().addLoadJob( - uuid.toString(), - insertStmt.getDb(), - insertStmt.getTargetTable().getId(), - insertStmt.getIndexIdToSchemaHash(), - insertStmt.getTransactionId(), - coord.getDeltaUrls(), - System.currentTimeMillis() - ); + Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( + insertStmt.getDbObj(), insertStmt.getTransactionId(), + TabletCommitInfo.fromThrift(coord.getCommitInfos()), + 5000); + context.getState().setOk(); + + // record the non-streaming insert info for show load + if (!insertStmt.isStreaming()) { + try { + context.getCatalog().getLoadManager().recordFinishedLoadJob( + uuid.toString(), + insertStmt.getDb(), + insertStmt.getTargetTable().getId(), + EtlJobType.INSERT, + System.currentTimeMillis() + ); + } catch (MetaNotFoundException e) { + LOG.warn("Record info of insert load with error " + e.getMessage(), e); + context.getState().setOk("Insert has been finished while info has not been recorded with " + e.getMessage()); + } context.getState().setOk("{'label':'" + uuid.toString() + "'}"); } }