-
Notifications
You must be signed in to change notification settings - Fork 919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kyuubi session handle and operation handle #11
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
6 tasks
ulysses-you
pushed a commit
that referenced
this pull request
Feb 24, 2023
…Table` close #4332 ### _Why are the changes needed?_ For the case where the table name has been resolved and an `Expand` logical plan exists ``` InsertIntoHiveTable `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [a, b] +- Aggregate [a#0], [a#0, ansi_cast((count(if ((gid#9 = 1)) spark_catalog.default.t2.`b`#10 else null) * count(if ((gid#9 = 2)) spark_catalog.default.t2.`c`#11 else null)) as string) AS b#8] +- Aggregate [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9], [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9] +- Expand [ArrayBuffer(a#0, b#1, null, 1), ArrayBuffer(a#0, null, c#2, 2)], [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9] +- HiveTableRelation [`default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [a#0, b#1, c#2], Partition Cols: []] ``` For the case `CacheTable` with `window` function ``` InsertIntoHiveTable `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, true, false, [a, b] +- Project [a#98, b#99] +- InMemoryRelation [a#98, b#99, rank#100], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Filter (isnotnull(rank#4) AND (rank#4 = 1)) +- Window [row_number() windowspecdefinition(a#9, b#10 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#4], [a#9], [b#10 ASC NULLS FIRST] +- *(1) Sort [a#9 ASC NULLS FIRST, b#10 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#9, 200), ENSURE_REQUIREMENTS, [id=#38] +- Scan hive default.t2 [a#9, b#10], HiveTableRelation [`default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [a#9, b#10], Partition Cols: []] ``` ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4393 from iodone/kyuubi-4332. Closes #4393 d2afdab [odone] fix cache table bug 443af79 [odone] fix some bugs with groupby Authored-by: odone <[email protected]> Signed-off-by: ulyssesyou <[email protected]>
ulysses-you
pushed a commit
that referenced
this pull request
Feb 24, 2023
…Table` close #4332 ### _Why are the changes needed?_ For the case where the table name has been resolved and an `Expand` logical plan exists ``` InsertIntoHiveTable `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [a, b] +- Aggregate [a#0], [a#0, ansi_cast((count(if ((gid#9 = 1)) spark_catalog.default.t2.`b`#10 else null) * count(if ((gid#9 = 2)) spark_catalog.default.t2.`c`#11 else null)) as string) AS b#8] +- Aggregate [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9], [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9] +- Expand [ArrayBuffer(a#0, b#1, null, 1), ArrayBuffer(a#0, null, c#2, 2)], [a#0, spark_catalog.default.t2.`b`#10, spark_catalog.default.t2.`c`#11, gid#9] +- HiveTableRelation [`default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [a#0, b#1, c#2], Partition Cols: []] ``` For the case `CacheTable` with `window` function ``` InsertIntoHiveTable `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, true, false, [a, b] +- Project [a#98, b#99] +- InMemoryRelation [a#98, b#99, rank#100], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Filter (isnotnull(rank#4) AND (rank#4 = 1)) +- Window [row_number() windowspecdefinition(a#9, b#10 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#4], [a#9], [b#10 ASC NULLS FIRST] +- *(1) Sort [a#9 ASC NULLS FIRST, b#10 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#9, 200), ENSURE_REQUIREMENTS, [id=#38] +- Scan hive default.t2 [a#9, b#10], HiveTableRelation [`default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [a#9, b#10], Partition Cols: []] ``` ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4393 from iodone/kyuubi-4332. Closes #4393 d2afdab [odone] fix cache table bug 443af79 [odone] fix some bugs with groupby Authored-by: odone <[email protected]> Signed-off-by: ulyssesyou <[email protected]> (cherry picked from commit 4277710) Signed-off-by: ulyssesyou <[email protected]>
pan3793
pushed a commit
that referenced
this pull request
Oct 30, 2023
… the max rows setting ### _Why are the changes needed?_ #### 1. know about this pr When we execute flink(1.17+) test case, it may throw exception when the test case is `show/stop job`, the exception desc like this ``` - execute statement - show/stop jobs *** FAILED *** org.apache.kyuubi.jdbc.hive.KyuubiSQLException: Error operating ExecuteStatement: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Could not stop job 4dece26857fab91d63fad1abd8c6bdd0 with savepoint for operation 9ed8247a-b7bd-4004-875b-61ba654ab3dd. at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:628) at org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:716) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:601) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:434) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:64) at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.runInternal(ExecuteStatement.scala:56) at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171) at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:101) at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:131) at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:82) at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:128) at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:67) at org.apache.kyuubi.service.TFrontendService.ExecuteStatement(TFrontendService.scala:252) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542) at org.apache.kyuubi.shade.org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) at org.apache.kyuubi.shade.org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36) at org.apache.kyuubi.shade.org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1[149](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:150)) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:617) ... 22 more Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1298) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45) at akka.dispatch.OnComplete.internal(Future.scala:299) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171) ... Cause: java.lang.RuntimeException: org.apache.flink.util.SerializedThrowable:java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ... Cause: java.lang.RuntimeException: org.apache.flink.util.SerializedThrowable:org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143) at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:[160](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:161)4) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:[168](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:169)) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ... ``` #### 2. what make the test case failed? If we want know the reason about the exception, we need to understand the process of flink executing stop job, the process line like this code space show(it's source is our bad test case, we can use this test case to solve similar problems) ``` 1. sql 1.1 create table tbl_a (a int) with ('connector' = 'datagen','rows-per-second'='10') 1.2 create table tbl_b (a int) with ('connector' = 'blackhole') 1.3 insert into tbl_b select * from tbl_a 2. start job: it will get 2 tasks abount source sink 3. show job: we can get job info 4. stop job(the main error): 4.1 stop job need checkpoint 4.2 start checkpoint, it need all task state is running 4.3 checkpoint can not get all task state is running, then throw the exception ``` Actually, in a normal process, it should not throw the exception, if this happens to your job, please check your kyuubi conf `kyuubi.session.engine.flink.max.rows`, it's default value is 1000000. But in the test case, we only the the conf's value is 10. It's the reason to make the error, this conf makes when we execute a stream query, it will cancel the when the limit is reached. Because flink's datagen is a streamconnector, so we can imagine, when we execute those sql, because our conf, it will make the sink task be canceled because the query reached 10. So when we execute stop job, flink checkpoint cannot get the tasks about this job is all in running state, then flink throw this exception. #### 3. how can we solve this problem? When your job makes the same exception, please make sure your kyuubi conf `kyuubi.session.engine.flink.max.rows`'s value can it meet your streaming query needs? Then changes the conf's value. close #5531 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5549 from davidyuan1223/fix_flink_test_bug. Closes #5531 ce7fd79 [david yuan] Update externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala dc3a4b9 [davidyuan] fix flink on yarn test bug 86a647a [davidyuan] fix flink on yarn test bug cbd4c0c [davidyuan] fix flink on yarn test bug 8b51840 [davidyuan] add common method to get session level config bcb0cf3 [davidyuan] Merge remote-tracking branch 'origin/master' 72e7aea [david yuan] Merge branch 'apache:master' into master 57ec746 [david yuan] Merge pull request #13 from davidyuan1223/fix 56b91a3 [yuanfuyuan] fix_4186 c8eb9a2 [david yuan] Merge branch 'apache:master' into master 2beccb6 [david yuan] Merge branch 'apache:master' into master 0925a4b [david yuan] Merge pull request #12 from davidyuan1223/revert-11-fix_4186 40e80d9 [david yuan] Revert "fix_4186" c83836b [david yuan] Merge pull request #11 from davidyuan1223/fix_4186 360d183 [david yuan] Merge branch 'master' into fix_4186 b616044 [yuanfuyuan] fix_4186 e244029 [david yuan] Merge branch 'apache:master' into master bfa6cbf [davidyuan1223] Merge branch 'apache:master' into master 16237c2 [davidyuan1223] Merge branch 'apache:master' into master c48ad38 [yuanfuyuan] remove the used blank lines 55a0a43 [xiaoyuandajian] Merge pull request #10 from xiaoyuandajian/fix-#4057 cb11935 [yuan] Merge remote-tracking branch 'origin/fix-#4057' into fix-#4057 86e4e1c [yuan] fix-#4057 info: modify the shellcheck errors file in ./bin 1. "$@" is a array, we want use string to compare. so update "$@" => "$*" 2. `tty` mean execute the command, we can use $(tty) replace it 3. param $# is a number, compare number should use -gt/-lt,not >/< 4. not sure the /bin/kyuubi line 63 'exit -1' need modify? so the directory bin only have a shellcheck note in /bin/kyuubi dd39efd [袁福元] fix-#4057 info: 1. "$@" is a array, we want use string to compare. so update "$@" => "$*" 2. `tty` mean execute the command, we can use $(tty) replace it 3. param $# is a number, compare number should use -gt/-lt,not >/< Lead-authored-by: davidyuan <[email protected]> Co-authored-by: david yuan <[email protected]> Co-authored-by: yuanfuyuan <[email protected]> Co-authored-by: yuan <[email protected]> Co-authored-by: davidyuan1223 <[email protected]> Co-authored-by: xiaoyuandajian <[email protected]> Co-authored-by: 袁福元 <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
pan3793
pushed a commit
that referenced
this pull request
Oct 30, 2023
… the max rows setting ### _Why are the changes needed?_ #### 1. know about this pr When we execute flink(1.17+) test case, it may throw exception when the test case is `show/stop job`, the exception desc like this ``` - execute statement - show/stop jobs *** FAILED *** org.apache.kyuubi.jdbc.hive.KyuubiSQLException: Error operating ExecuteStatement: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Could not stop job 4dece26857fab91d63fad1abd8c6bdd0 with savepoint for operation 9ed8247a-b7bd-4004-875b-61ba654ab3dd. at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:628) at org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:716) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:601) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:434) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:64) at org.apache.kyuubi.engine.flink.operation.ExecuteStatement.runInternal(ExecuteStatement.scala:56) at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171) at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:101) at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:131) at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:82) at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:128) at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:67) at org.apache.kyuubi.service.TFrontendService.ExecuteStatement(TFrontendService.scala:252) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542) at org.apache.kyuubi.shade.org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) at org.apache.kyuubi.shade.org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36) at org.apache.kyuubi.shade.org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1[149](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:150)) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:617) ... 22 more Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1298) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45) at akka.dispatch.OnComplete.internal(Future.scala:299) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171) ... Cause: java.lang.RuntimeException: org.apache.flink.util.SerializedThrowable:java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ... Cause: java.lang.RuntimeException: org.apache.flink.util.SerializedThrowable:org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143) at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:[160](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:161)4) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:[168](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:169)) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ... ``` #### 2. what make the test case failed? If we want know the reason about the exception, we need to understand the process of flink executing stop job, the process line like this code space show(it's source is our bad test case, we can use this test case to solve similar problems) ``` 1. sql 1.1 create table tbl_a (a int) with ('connector' = 'datagen','rows-per-second'='10') 1.2 create table tbl_b (a int) with ('connector' = 'blackhole') 1.3 insert into tbl_b select * from tbl_a 2. start job: it will get 2 tasks abount source sink 3. show job: we can get job info 4. stop job(the main error): 4.1 stop job need checkpoint 4.2 start checkpoint, it need all task state is running 4.3 checkpoint can not get all task state is running, then throw the exception ``` Actually, in a normal process, it should not throw the exception, if this happens to your job, please check your kyuubi conf `kyuubi.session.engine.flink.max.rows`, it's default value is 1000000. But in the test case, we only the the conf's value is 10. It's the reason to make the error, this conf makes when we execute a stream query, it will cancel the when the limit is reached. Because flink's datagen is a streamconnector, so we can imagine, when we execute those sql, because our conf, it will make the sink task be canceled because the query reached 10. So when we execute stop job, flink checkpoint cannot get the tasks about this job is all in running state, then flink throw this exception. #### 3. how can we solve this problem? When your job makes the same exception, please make sure your kyuubi conf `kyuubi.session.engine.flink.max.rows`'s value can it meet your streaming query needs? Then changes the conf's value. close #5531 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5549 from davidyuan1223/fix_flink_test_bug. Closes #5531 ce7fd79 [david yuan] Update externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala dc3a4b9 [davidyuan] fix flink on yarn test bug 86a647a [davidyuan] fix flink on yarn test bug cbd4c0c [davidyuan] fix flink on yarn test bug 8b51840 [davidyuan] add common method to get session level config bcb0cf3 [davidyuan] Merge remote-tracking branch 'origin/master' 72e7aea [david yuan] Merge branch 'apache:master' into master 57ec746 [david yuan] Merge pull request #13 from davidyuan1223/fix 56b91a3 [yuanfuyuan] fix_4186 c8eb9a2 [david yuan] Merge branch 'apache:master' into master 2beccb6 [david yuan] Merge branch 'apache:master' into master 0925a4b [david yuan] Merge pull request #12 from davidyuan1223/revert-11-fix_4186 40e80d9 [david yuan] Revert "fix_4186" c83836b [david yuan] Merge pull request #11 from davidyuan1223/fix_4186 360d183 [david yuan] Merge branch 'master' into fix_4186 b616044 [yuanfuyuan] fix_4186 e244029 [david yuan] Merge branch 'apache:master' into master bfa6cbf [davidyuan1223] Merge branch 'apache:master' into master 16237c2 [davidyuan1223] Merge branch 'apache:master' into master c48ad38 [yuanfuyuan] remove the used blank lines 55a0a43 [xiaoyuandajian] Merge pull request #10 from xiaoyuandajian/fix-#4057 cb11935 [yuan] Merge remote-tracking branch 'origin/fix-#4057' into fix-#4057 86e4e1c [yuan] fix-#4057 info: modify the shellcheck errors file in ./bin 1. "$@" is a array, we want use string to compare. so update "$@" => "$*" 2. `tty` mean execute the command, we can use $(tty) replace it 3. param $# is a number, compare number should use -gt/-lt,not >/< 4. not sure the /bin/kyuubi line 63 'exit -1' need modify? so the directory bin only have a shellcheck note in /bin/kyuubi dd39efd [袁福元] fix-#4057 info: 1. "$@" is a array, we want use string to compare. so update "$@" => "$*" 2. `tty` mean execute the command, we can use $(tty) replace it 3. param $# is a number, compare number should use -gt/-lt,not >/< Lead-authored-by: davidyuan <[email protected]> Co-authored-by: david yuan <[email protected]> Co-authored-by: yuanfuyuan <[email protected]> Co-authored-by: yuan <[email protected]> Co-authored-by: davidyuan1223 <[email protected]> Co-authored-by: xiaoyuandajian <[email protected]> Co-authored-by: 袁福元 <[email protected]> Signed-off-by: Cheng Pan <[email protected]> (cherry picked from commit 26f614a) Signed-off-by: Cheng Pan <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.