Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change insert into to streaming #1191

Merged
merged 4 commits into from
May 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/exec/broker_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ Status BrokerReader::open() {
try {
client->openReader(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
usleep(1000 * 1000);
RETURN_IF_ERROR(client.reopen());
client->openReader(response, request);
}
Expand Down Expand Up @@ -143,6 +144,7 @@ Status BrokerReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
try {
client->pread(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
usleep(1000 * 1000);
RETURN_IF_ERROR(client.reopen());
LOG(INFO) << "retry reading from broker: " << broker_addr << ". reason: " << e.what();
client->pread(response, request);
Expand Down Expand Up @@ -197,6 +199,7 @@ void BrokerReader::close() {
try {
client->closeReader(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
usleep(1000 * 1000);
status = client.reopen();
if (!status.ok()) {
LOG(WARNING) << "Close broker reader failed. broker=" << broker_addr
Expand Down
14 changes: 10 additions & 4 deletions docs/documentation/cn/sql-reference/sql-statements/insert.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

```
INSERT INTO table_name
[ PARTITION (, ...) ]
[ (column [, ...]) ]
[ \[ hint [, ...] \] ]
{ VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
Expand All @@ -13,6 +14,8 @@ INSERT INTO table_name

INSERT 向一张表里插入数据。用户可以通过 VALUES 语法插入一条或者多条数据,或者通过一个查询来插入0条或者多条数据。

partition是目标分区,如果指定目标分区,则只会导入符合目标分区的数据。如果没有指定,则默认值为这张表的所有分区。

column是目标列,可以以任意的顺序存在。如果没有指定目标列,那么默认值是这张表的所有列。

如果表中的某个列没有存在目标列中,那么这个列需要有默认值,否则 INSERT 就会执行失败。
Expand All @@ -23,6 +26,8 @@ column是目标列,可以以任意的顺序存在。如果没有指定目标

> tablet_name: 导入数据的目的表。可以是 `db_name.table_name` 形式
>
> partition_names: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔
>
> column_name: 指定的目的列,必须是 `table_name` 中存在的列
>
> expression: 需要赋值给某个列的对应表达式
Expand All @@ -31,7 +36,8 @@ column是目标列,可以以任意的顺序存在。如果没有指定目标
>
> query: 一个普通查询,查询的结果会写入到目标中
>
> hint: 用于指示`INSERT`执行行为的一些指示符。`streaming`,用于指示使用同步方式来完成`INSERT`语句执行。
> hint: 用于指示 `INSERT` 执行行为的一些指示符。`streaming` 和 默认的非 `streaming` 方式均会使用同步方式完成 `INSERT` 语句执行
> 非 `streaming` 方式在执行完成后会返回一个 label 方便用户通过 `SHOW LOAD` 查询导入的状态

## Note

Expand Down Expand Up @@ -72,13 +78,13 @@ INSERT INTO test [streaming] SELECT * FROM test2
INSERT INTO test (c1, c2) [streaming] SELECT * from test2
```

为了兼容的问题,默认的insert方式是异步完成的,效率比较差。如果需要使用效率比较高的导入方式,需要加上`[streaming]`来使用同步导入方式。

4. 向`test`表中异步的导入一个查询语句结果

```
INSERT INTO test SELECT * FROM test2
INSERT INTO test (c1, c2) SELECT * from test2
```

由于Doris之前的导入方式都是异步导入方式,这个导入语句会返回一个导入作业的`label`,用户需要通过`SHOW LOAD`命令查看此`label`导入作业的运行状况,当状态为`FINISHED`时,导入数据才生效。
异步的导入其实是,一个同步的导入封装成了异步。填写 streaming 和不填写的*执行效率是一样*的。

由于Doris之前的导入方式都是异步导入方式,为了兼容旧有的使用习惯,不加 streaming 的 `INSERT` 语句依旧会返回一个 label,用户需要通过`SHOW LOAD`命令查看此`label`导入作业的状态。
41 changes: 25 additions & 16 deletions fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -57,7 +58,22 @@
import java.util.Set;
import java.util.UUID;

// InsertStmt used to
/**
* Insert into is performed to load data from the result of query stmt.
*
* syntax:
* INSERT INTO table_name [partition_info] [col_list] [plan_hints] query_stmt
*
* table_name: is the name of target table
* partition_info: PARTITION (p1,p2)
* the partition info of target table
* col_list: (c1,c2)
* the column list of target table
* plan_hints: [STREAMING,SHUFFLE_HINT]
* The streaming plan is used by both streaming and non-streaming insert stmt.
* The only difference is that non-streaming will record the load info in LoadManager and return label.
* User can check the load info by show load stmt.
*/
public class InsertStmt extends DdlStmt {
private static final Logger LOG = LogManager.getLogger(InsertStmt.class);

Expand Down Expand Up @@ -239,16 +255,13 @@ public void analyze(Analyzer analyzer) throws UserException {
// if all previous job finished
UUID uuid = UUID.randomUUID();
String jobLabel = "insert_" + uuid;
LoadJobSourceType sourceType = isStreaming ? LoadJobSourceType.INSERT_STREAMING
: LoadJobSourceType.FRONTEND;
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
if (isStreaming) {
OlapTableSink sink = (OlapTableSink) dataSink;
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
sink.init(loadId, transactionId, db.getId());
}
OlapTableSink sink = (OlapTableSink) dataSink;
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
sink.init(loadId, transactionId, db.getId());
}
}

Expand Down Expand Up @@ -498,13 +511,9 @@ public DataSink createDataSink() throws AnalysisException {
return dataSink;
}
if (targetTable instanceof OlapTable) {
if (isStreaming) {
dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple);
dataPartition = dataSink.getOutputPartition();
} else {
dataSink = new DataSplitSink((OlapTable) targetTable, olapTuple);
dataPartition = dataSink.getOutputPartition();
}
String partitionNames = targetPartitions == null ? null : Joiner.on(",").join(targetPartitions);
dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, partitionNames);
dataPartition = dataSink.getOutputPartition();
} else if (targetTable instanceof BrokerTable) {
BrokerTable table = (BrokerTable) targetTable;
// TODO(lingbin): think use which one if have more than one path
Expand All @@ -525,7 +534,7 @@ public DataSink createDataSink() throws AnalysisException {
}

public void finalize() throws UserException {
if (isStreaming) {
if (targetTable instanceof OlapTable) {
((OlapTableSink) dataSink).finalize();
}
}
Expand Down
8 changes: 4 additions & 4 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1751,9 +1751,9 @@ public long loadRoutineLoadJobs(DataInputStream dis, long checksum) throws IOExc
}

public long loadLoadJobsV2(DataInputStream in, long checksum) throws IOException {
// if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_50) {
// Catalog.getCurrentCatalog().getLoadManager().readFields(in);
// }
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_50) {
Catalog.getCurrentCatalog().getLoadManager().readFields(in);
}
return checksum;
}

Expand Down Expand Up @@ -2057,7 +2057,7 @@ public void replayGlobalVariable(SessionVariable variable) throws IOException, D
}

public long saveLoadJobsV2(DataOutputStream out, long checksum) throws IOException {
// Catalog.getCurrentCatalog().getLoadManager().write(out);
Catalog.getCurrentCatalog().getLoadManager().write(out);
return checksum;
}

Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/common/FeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ public class FeConstants {

// general model
// Current meta data version. Use this version to write journals and image
public static int meta_version = FeMetaVersion.VERSION_49;
public static int meta_version = FeMetaVersion.VERSION_50;
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,5 @@ public final class FeMetaVersion {
// routine load job
public static final int VERSION_49 = 49;
// load job v2 for broker load
// public static final int VERSION_50 = 50;
public static final int VERSION_50 = 50;
}
17 changes: 15 additions & 2 deletions fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;

Expand Down Expand Up @@ -57,7 +59,6 @@ public class BrokerFileGroup implements Writable {
private String valueSeparator;
private String lineDelimiter;
// fileFormat may be null, which means format will be decided by file's suffix
// TODO(zc): we need to persist fileFormat, this should be done in next META_VERSION increase
private String fileFormat;
private boolean isNegative;
private List<Long> partitionIds;
Expand Down Expand Up @@ -264,7 +265,13 @@ public void write(DataOutput out) throws IOException {
Expr.writeTo(entry.getValue(), out);
}
}
//
// fileFormat
if (fileFormat == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out, fileFormat);
}
}

@Override
Expand Down Expand Up @@ -312,6 +319,12 @@ public void readFields(DataInput in) throws IOException {
}
}
}
// file format
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_50) {
if (in.readBoolean()) {
fileFormat = Text.readString(in);
}
}
}

public static BrokerFileGroup read(DataInput in) throws IOException {
Expand Down
106 changes: 106 additions & 0 deletions fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.doris.load.loadv2;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
* The class is performed to record the finished info of insert load job.
* It is created after txn is visible which belongs to insert load job.
* The state of insert load job is always finished, so it will never be scheduled by JobScheduler.
*/
public class InsertLoadJob extends LoadJob {

private long tableId;

// only for log replay
public InsertLoadJob() {
super();
this.jobType = EtlJobType.INSERT;
}

public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp) {
super(dbId, label);
this.tableId = tableId;
this.createTimestamp = createTimestamp;
this.loadStartTimestamp = createTimestamp;
this.finishTimestamp = System.currentTimeMillis();
this.state = JobState.FINISHED;
this.progress = 100;
this.jobType = EtlJobType.INSERT;
this.timeoutSecond = Config.insert_load_default_timeout_second;
}

@Override
protected Set<String> getTableNames() throws MetaNotFoundException {
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
database.readLock();
try {
Table table = database.getTable(tableId);
if (table == null) {
throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId);
}
return new HashSet<>(Arrays.asList(table.getName()));
} finally {
database.readUnlock();
}
}

@Override
void executeJob() {
}

@Override
public void onTaskFinished(TaskAttachment attachment) {
}

@Override
public void onTaskFailed(long taskId, FailMsg failMsg) {
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeLong(tableId);
}

@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
tableId = in.readLong();
}
}
10 changes: 6 additions & 4 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ protected static void checkDataSourceInfo(Database db, List<DataDescription> dat
}
}

abstract Set<String> getTableNames();
abstract Set<String> getTableNames() throws MetaNotFoundException;

public void isJobTypeRead(boolean jobTypeRead) {
isJobTypeRead = jobTypeRead;
Expand Down Expand Up @@ -331,7 +331,7 @@ public void cancelJobWithoutCheck(FailMsg failMsg) {
logFinalOperation();
}

public void cancelJob(FailMsg failMsg) throws DdlException {
public void cancelJob(FailMsg failMsg) throws DdlException, MetaNotFoundException {
writeLock();
try {
// check
Expand All @@ -357,7 +357,7 @@ public void cancelJob(FailMsg failMsg) throws DdlException {
logFinalOperation();
}

private void checkAuth() throws DdlException {
private void checkAuth() throws DdlException, MetaNotFoundException {
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
throw new DdlException("Db does not exist. id: " + dbId);
Expand Down Expand Up @@ -467,7 +467,7 @@ public void unprotectReadEndOperation(LoadJobFinalOperation loadJobFinalOperatio
failMsg = loadJobFinalOperation.getFailMsg();
}

public List<Comparable> getShowInfo() throws DdlException {
public List<Comparable> getShowInfo() throws DdlException, MetaNotFoundException {
readLock();
try {
// check auth
Expand Down Expand Up @@ -537,6 +537,8 @@ public static LoadJob read(DataInput in) throws IOException {
EtlJobType type = EtlJobType.valueOf(Text.readString(in));
if (type == EtlJobType.BROKER) {
job = new BrokerLoadJob();
} else if (type == EtlJobType.INSERT) {
job = new InsertLoadJob();
} else {
throw new IOException("Unknown load type: " + type.name());
}
Expand Down
Loading