diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ErrorReason.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ErrorReason.java index a3451d61fc3dfa..4d290e5a3fdce9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ErrorReason.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ErrorReason.java @@ -18,9 +18,19 @@ package org.apache.doris.load.routineload; import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; -public class ErrorReason { +import com.google.gson.annotations.SerializedName; + +import java.io.DataOutput; +import java.io.IOException; + +public class ErrorReason implements Writable { + @SerializedName(value = "code") private InternalErrorCode code; + @SerializedName(value = "msg") private String msg; public ErrorReason(InternalErrorCode errCode, String msg) { @@ -44,6 +54,12 @@ public void setMsg(String msg) { this.msg = msg; } + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + @Override public String toString() { return "ErrorReason{" + "code=" + code + ", msg='" + msg + '\'' + '}'; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 51f318377802ac..aaf1ef59a0cab3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1430,7 +1430,11 @@ protected void unprotectUpdateState(JobState jobState, ErrorReason reason, boole } if (!isReplay && jobState != JobState.RUNNING) { - Env.getCurrentEnv().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState)); + if (jobState == JobState.PAUSED) { + Env.getCurrentEnv().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState, reason)); + } else { + Env.getCurrentEnv().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState)); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index e318ed77bb90a1..8b0ebab3fbe7d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -831,7 +831,7 @@ public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) { public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { RoutineLoadJob job = getJob(operation.getId()); try { - job.updateState(operation.getJobState(), null, true /* is replay */); + job.updateState(operation.getJobState(), operation.getErrorReason(), true /* is replay */); } catch (UserException e) { LOG.error("should not happened", e); } catch (NullPointerException npe) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java b/fe/fe-core/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java index aa39788f4f02ec..d3ca454275d95c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java @@ -21,6 +21,7 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.load.routineload.ErrorReason; import org.apache.doris.load.routineload.RoutineLoadJob.JobState; import org.apache.doris.persist.gson.GsonUtils; @@ -35,6 +36,8 @@ public class RoutineLoadOperation implements Writable { private long id; @SerializedName("js") private JobState jobState; + @SerializedName("rs") + private ErrorReason reason; private RoutineLoadOperation() { } @@ -44,6 +47,12 @@ public RoutineLoadOperation(long id, JobState jobState) { this.jobState = jobState; } + public RoutineLoadOperation(long id, JobState jobState, ErrorReason reason) { + this.id = id; + this.jobState = jobState; + this.reason = reason; + } + public long getId() { return id; } @@ -52,6 +61,10 @@ public JobState getJobState() { return jobState; } + public ErrorReason getErrorReason() { + return reason; + } + public static RoutineLoadOperation read(DataInput in) throws IOException { if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { RoutineLoadOperation operation = new RoutineLoadOperation(); diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy new file mode 100644 index 00000000000000..d60fbf265fd9e5 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_restart_fe", "p0, nonConcurrent") { + def kafkaCsvTpoics = [ + "test_out_of_range", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + def options = new ClusterOptions() + options.setFeNum(1) + docker(options) { + def load_with_injection = { injection -> + def jobName = "test_routine_load_restart" + def tableName = "dup_tbl_basic_multi_table" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text + sql "sync" + + sql """ + CREATE ROUTINE LOAD ${jobName} + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "test_out_of_range", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state == "PAUSED") { + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + assertTrue(res[0][17].toString().contains("Offset out of range")) + assertTrue(res[0][17].toString().contains("consume partition")) + assertTrue(res[0][17].toString().contains("consume offset")) + GetDebugPoint().disableDebugPointForAllBEs(injection) + break; + } + count++ + if (count > 60) { + GetDebugPoint().disableDebugPointForAllBEs(injection) + assertEquals(1, 2) + break; + } else { + continue; + } + } + } catch (Exception e) { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } + } + load_with_injection("KafkaDataConsumer.group_consume.out_of_range") + + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state == "PAUSED") { + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + assertTrue(res[0][17].toString().contains("Offset out of range")) + assertTrue(res[0][17].toString().contains("consume partition")) + assertTrue(res[0][17].toString().contains("consume offset")) + } else { + assertEquals(1, 2) + } + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } +} +