Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.AssertionError: assertion failed: Byte array does not have correct length #702

Closed
devoplib opened this issue Aug 10, 2024 · 23 comments
Labels
question Further information is requested

Comments

@devoplib
Copy link

Background [Optional]

A clear explanation of the reason for raising the question.
This gives us a better understanding of your use cases and how we might accommodate them.

Question

A clear and concise inquiry

First of all Thank you very much for all your work and support.
We are using COBRIX to convert mainframe files from EBCDIC to ASCII and it is working perfectly fine in Databricks.
To increase the throughput and process the data with more speed we are running the same job in parallel i.e. processing multiple files using the same program by passing different files to each one of them instead of sending it to one task. Say earlier we have fileload task to load all the 42 files we are running the same cobrix conversion module 6 times with 7 files each.
We are getting following "java.lang.AssertionError: assertion failed: Byte array does not have correct length" when we are wrting the data from dataframe to databricks UC table i.e. saveastable or when we do transformations on the dataframe like df.count() or df.rdd.isEmpty etc.

Note: When failed task is resubmitted it completes and it seems some kind of memory contention on Databricks driver since when run as a single task it never fails.

df.write.mode("overWrite").format("delta").saveAsTable(f"{table_name}")

looking for any advise on where to look and debug the error. Any help is appreciated.

Please find the complete error below.
Py4JJavaError: An error occurred while calling o2411.saveAsTable.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 3238.0 failed 4 times, most recent failure: Lost task 22.3 in stage 3238.0 (TID 50850) (100.126.48.51 executor 27): java.lang.AssertionError: assertion failed: Byte array does not have correct length
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.SparkContext.$anonfun$binaryRecords$2(SparkContext.scala:1603)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage24.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
at com.google.common.collect.Iterators$PeekingImpl.hasNext(Iterators.java:1139)
at com.databricks.photon.NativeRowBatchIterator.hasNext(NativeRowBatchIterator.java:44)
at 0xa37b947 .HasNext(external/workspace_spark_3_5/photon/jni-wrappers/jni-row-batch-iterator.cc:50)
at com.databricks.photon.JniApiImpl.hasNext(Native Method)
at com.databricks.photon.JniApi.hasNext(JniApi.scala)
at com.databricks.photon.JniExecNode.hasNext(JniExecNode.java:76)
at com.databricks.photon.BasePhotonResultHandler$$anon$1.hasNext(PhotonExec.scala:862)
at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.$anonfun$hasNext$1(PhotonBasicEvaluatorFactory.scala:211)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at com.databricks.photon.PhotonResultHandler.timeit(PhotonResultHandler.scala:30)
at com.databricks.photon.PhotonResultHandler.timeit$(PhotonResultHandler.scala:28)
at com.databricks.photon.BasePhotonResultHandler.timeit(PhotonExec.scala:849)
at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.hasNext(PhotonBasicEvaluatorFactory.scala:211)
at com.databricks.photon.CloseableIterator$$anon$10.hasNext(CloseableIterator.scala:211)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.hashAgg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3908)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3830)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3817)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3817)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1695)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1680)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1680)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4154)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4066)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4054)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54)
Caused by: java.lang.AssertionError: assertion failed: Byte array does not have correct length
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.SparkContext.$anonfun$binaryRecords$2(SparkContext.scala:1603)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage24.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
at com.google.common.collect.Iterators$PeekingImpl.hasNext(Iterators.java:1139)
at com.databricks.photon.NativeRowBatchIterator.hasNext(NativeRowBatchIterator.java:44)
at 0xa37b947 .HasNext(external/workspace_spark_3_5/photon/jni-wrappers/jni-row-batch-iterator.cc:50)
at com.databricks.photon.JniApiImpl.hasNext(Native Method)
at com.databricks.photon.JniApi.hasNext(JniApi.scala)
at com.databricks.photon.JniExecNode.hasNext(JniExecNode.java:76)
at com.databricks.photon.BasePhotonResultHandler$$anon$1.hasNext(PhotonExec.scala:862)
at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.$anonfun$hasNext$1(PhotonBasicEvaluatorFactory.scala:211)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at com.databricks.photon.PhotonResultHandler.timeit(PhotonResultHandler.scala:30)
at com.databricks.photon.PhotonResultHandler.timeit$(PhotonResultHandler.scala:28)
at com.databricks.photon.BasePhotonResultHandler.timeit(PhotonExec.scala:849)
at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.hasNext(PhotonBasicEvaluatorFactory.scala:211)
at com.databricks.photon.CloseableIterator$$anon$10.hasNext(CloseableIterator.scala:211)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.hashAgg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage46.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

@devoplib devoplib added the question Further information is requested label Aug 10, 2024
@yruslan
Copy link
Collaborator

yruslan commented Aug 13, 2024

Hi @devoplib ,

Thanks for your interest in Cobrix!

I've looked at the stack trace. It seems like the issue is with the reshuffling or the writer since there are no za.co.absa.cobrix classes in the stack trace.

Could you elaborate on how do you run tasks in parallel? Do you execute multiple threads within executors, or do you run multiple Spark actions in a multi-threaded fashion? Could you share an example code?

@devoplib
Copy link
Author

Initially we created one task and that task runs a note book that reads the EBCDIC files and converts to ASCII and creates dataframe. We save the dataframe onto Databricks unity catalog. We have many files and to increase the output we created multiple tasks and pass the files to each one of them and each task runs the same notebook. When we run the single task with all the files we don't see any issues but when we are running the notebook in multiple tasks it is giving the error. The difference between the two runs is in the first one we send all the files to one task in the second one we split the files and pass it to multiple tasks. Say we have 10 files we run 5 tasks and send 2 files randomly to each of the 5 tasks. When we run multiple tasks it is failing while doing transformations on the Dataframe for example df.count().
Please let me know If you hav any questions.
Notebook mainly has df.write.mode("overWrite").format("delta").saveAsTable(f"{table_name}") and we loaded the cobrix bundle as library in the cluster.

@yruslan
Copy link
Collaborator

yruslan commented Aug 14, 2024

Actually, Cobrix already runs conversion in parallel. You don't need to process each file separatey. You can just use '*' the path name:

spark.read.format("cobol").load("s3://bucket/path/path/*")

or just

spark.read.format("cobol").load("s3://bucket/path/path")

@devoplib
Copy link
Author

EBCDIC to ASCII conversion process is running in parallel for each table since layout is same like you mentioned above. We have couple of tables that needs to be processed and the schema is different so we need to run one by one and they run parallel.
Parallel run that I mentioned is loading different tables using the same python notebook i.e. multiple tasks and only difference is input files and for each table we process mutiple files and they run in parallel.
Please let me know

@yruslan
Copy link
Collaborator

yruslan commented Aug 15, 2024

I see, makes sense. Thanks for the explanation. From what I can see, you are processing files properly, and there should not be any issues. I suspect this is related to the environment or an issue on the writer's side, not spark-cobol (the reader size). The stack trace does not contain any call chain that leads to the Cobrix codebase.

@devoplib
Copy link
Author

Thank you very much. When we retries the task it completes or it fails couple of times and completes finally. Databricks is saying since we are using Cobrix and the issue is with Cobrix and I know it is not Cobrix module but want to make sure. I will check with them again and really appreciate your quick response.

@pinakigit
Copy link

Hi Devoolib, Let me know the solution if you hear back anything from Databricks. We have similar kind of issues and we think it’s Databricks but it doesn’t throw any proper error.

@devoplib
Copy link
Author

devoplib commented Sep 4, 2024

Definitely I will share If we find anything related to this error.
We have been working [sharing logs] with Databricks team and they are saying that it is due to the external module that we are using[Cobrix] but we mentioned that it runs fine after couple of reruns. We will continue to work with Databricks team and get additional resources to pursue the root cause of the issue.

@vinodkc
Copy link

vinodkc commented Sep 16, 2024

We are getting following "java.lang.AssertionError: assertion failed: Byte array does not have correct length" when we are wrting the data from dataframe to databricks UC table i.e. saveastable or when we do transformations on the dataframe like df.count() or df.rdd.isEmpty etc.

If there is the same assertion error on df.count(), the issue may be on the read flow. The same issue during saveastable/write operation is a red herring as the write follows the read.

@yruslan, during the read flow of the shuffle map task execution in the executor, assertion failure is from sqlContext.sparkContext.binaryRecords object created here

I see the recordSize passed from CobolScannersto sqlContext.sparkContext.binaryRecords is getting asserted in this Spark code

ie .

val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
      classOf[FixedLengthBinaryInputFormat],
      classOf[LongWritable],
      classOf[BytesWritable],
      conf = conf)
    br.map { case (k, v) =>
      val bytes = v.copyBytes()
      assert(bytes.length == recordLength, "Byte array does not have correct length")
      bytes
    }

Is there any chance that val recordSize = reader.getRecordSize gives a different value than v.copyBytes().legth?

@devoplib devoplib
To isolate the issue, please disable photon and simulate the issue.

@pinakigit
Are you also following this approach and seeing the issue?

To increase the throughput and process the data with more speed we are running the same job in parallel i.e. processing multiple files using the same program by passing different files to each one of them instead of sending it to one task.

@pinakigit
Copy link

We have tried wit
h and without photon as suggested by databricks and it doesn't help. We have seen that disabling autoscaling we haven't seen the issue. We are observing for couple of more days. We are runnig multiple pipelines for hundreds of files in parallel.

@vinodkc
Copy link

vinodkc commented Sep 17, 2024

@pinakigit , I didn't mean that the photon is affecting this. Avoiding photon is to isolate the issue towards the executor JVM and CobolScanner path.
Could you please share the exception stack trace from the non-Photon executor?

@devoplib
Copy link
Author

Thank you very much for looking into it.

Note: In order to complete the job we are retrying the job multiple times. Since we have many streams running parallel they are getting completed and after couple of retries they are all getting done. The issue is it taking resources and ultimately costing more $s. If it runs with out failure it will run faster and help us overall.

Please note that I tried with the following 2 options and it is still failing from my end with the same error.
1.) Disable Auto Scaling with Photon Accelerator
2.) Disable Auto Scaling and Disable Photon Accelerator

Here is the error:
Py4JJavaError: An error occurred while calling o648.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 351.0 failed 4 times, most recent failure: Lost task 0.3 in stage 351.0 (TID 350) (100.126.10.219 executor 3): java.lang.AssertionError: assertion failed: Byte array does not have correct length
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.SparkContext.$anonfun$binaryRecords$2(SparkContext.scala:1614)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.hashAgg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3908)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3830)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3817)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3817)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1695)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1680)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1680)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4154)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4066)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4054)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54)
Caused by: java.lang.AssertionError: assertion failed: Byte array does not have correct length
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.SparkContext.$anonfun$binaryRecords$2(SparkContext.scala:1614)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.hashAgg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

@vinodkcin
Copy link
Contributor

vinodkcin commented Sep 18, 2024

@devoplib , Thanks for the stack trace.
Are you using variable record length mainframe files?
If so, could you please share how the dataframe was created ?
eg :

val df = spark
      .read
      .format("cobol")                                     
      .option("copybook_contents", copybook)               
      .option("record_format", "V")                        
      //.option("copybook", "data/companies_copybook.cpy") 
      //.option("generate_record_id", true)                
      .option("schema_retention_policy", "collapse_root")  
      .option("segment_field", "SEGMENT_ID")               
      .option("segment_id_level0", "C")                    
      .option("segment_id_prefix", "ID")                   
      .load("../example_data/companies_data")  

@devoplib
Copy link
Author

Please find below the code that creates the dataframe

code_page_name = 'cp037_extended'
cobolDataFrame = spark
.read.format("za.co.absa.cobrix.spark.cobol.source")
.option("ebcdic_code_page", code_page_name)
.option("schema_retention_policy", "collapse_root")
.option("string_trimming_policy", "none")
.option("pedantic", "true")
.option("improved_null_detection", "false")
.option("copybook", tmp_copybook_file)
.load(ebcdic_table_data_file)

We do count and rdd.isEmpty() etc.. as part of validation.

Please let me know If you need additional information

@vinodkc
Copy link

vinodkc commented Oct 5, 2024

@devoplib , @pinakigit , Thanks for sharing the details.
It seems you're encountering a race condition when multiple Spark-cobrix connectors share the same Spark driver instance.

I locally tested the fix and then raised a PR #714
@yruslan, Could you please review the PR ?

@devoplib
Copy link
Author

devoplib commented Oct 6, 2024

Excellent. Thank you very much @vinodkc.
@yruslan: Please advise.
Thank you.

@yruslan
Copy link
Collaborator

yruslan commented Oct 7, 2024

@vinodkc, Thank you very much for the fix! 🚀

@yruslan
Copy link
Collaborator

yruslan commented Oct 7, 2024

Merged. @devoplib, @pinakigit, please, text. Will release a new version tomorrow.

@pinakigit
Copy link

Sure. We are testing and will let you know the results after the batch is complete.

@devoplib
Copy link
Author

devoplib commented Oct 9, 2024

Our testing was successfull. @pinakigit : How did your batch testing go?
Thank you

@pinakigit
Copy link

Our testing is successful for 2 day's batch run. Seems the fix is working. Let us know when the new version is available

@devoplib
Copy link
Author

devoplib commented Oct 9, 2024

@pinakigit: Thank you very much for the testing.
@yruslan: Please let us know when the new version with this is available.
Thank you very much to all.

@yruslan
Copy link
Collaborator

yruslan commented Oct 10, 2024

Cobrox 2.7.7 is released to Maven. It should be visible in a couple of hours. Thanks a lot again for the fix, @vinodkc, and @pinakigit for testing!

@yruslan yruslan closed this as completed Oct 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

5 participants