From eaebb6474b4ef9645fb9b45e4ef6c79cb4f0bebe Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 27 Apr 2023 08:54:17 +0200 Subject: [PATCH] #614 Fix index handling of custom record parsers that read from the input stream in the constructor. --- .../co/absa/cobrix/cobol/reader/VarLenNestedReader.scala | 2 ++ .../co/absa/cobrix/cobol/reader/index/IndexGenerator.scala | 7 ++++--- .../co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala | 4 ++-- .../cobol/source/integration/Test5MultisegmentSpec.scala | 2 +- .../source/regression/Test12MultiRootSparseIndex.scala | 4 ++-- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 36174342..1b91ca24 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -175,6 +175,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], segmentIdField match { case Some(field) => IndexGenerator.sparseIndexGenerator(fileNumber, binaryData, + readerProperties.fileStartOffset, isRdwBigEndian, recordHeaderParser, recordExtractor(0L, binaryData, copybook), @@ -186,6 +187,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], segmentIdValue) case None => IndexGenerator.sparseIndexGenerator(fileNumber, binaryData, + readerProperties.fileStartOffset, isRdwBigEndian, recordHeaderParser, recordExtractor(0L, binaryData, copybook), diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala index b7a71fab..e9b04220 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala @@ -31,6 +31,7 @@ object IndexGenerator extends Logging { def sparseIndexGenerator(fileId: Int, dataStream: SimpleStream, + fileStartOffset: Long, isRdwBigEndian: Boolean, recordHeaderParser: RecordHeaderParser, recordExtractor: Option[RawRecordExtractor], @@ -41,7 +42,7 @@ object IndexGenerator extends Logging { isHierarchical: Boolean, rootSegmentId: String = ""): ArrayBuffer[SparseIndexEntry] = { val rootSegmentIds = rootSegmentId.split(',').toList - var byteIndex = 0L + var byteIndex = fileStartOffset val index = new ArrayBuffer[SparseIndexEntry] var rootRecordId: String = "" var recordsInChunk = 0 @@ -53,7 +54,7 @@ object IndexGenerator extends Logging { val needSplit = getSplitCondition(recordsPerIndexEntry, sizePerIndexEntryMB) // Add the first mandatory index entry - val indexEntry = SparseIndexEntry(dataStream.offset, -1, fileId, recordIndex) + val indexEntry = SparseIndexEntry(fileStartOffset, -1, fileId, recordIndex) index += indexEntry var endOfFileReached = false @@ -61,7 +62,7 @@ object IndexGenerator extends Logging { var record: Array[Byte] = null val (recordSize: Long, isValid, hasMoreRecords, canSplit) = recordExtractor match { case Some(extractor) => - val offset0 = extractor.offset + val offset0 = byteIndex val canSplit = extractor.canSplitHere val isValid = if (extractor.hasNext) { record = extractor.next() diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala index dce0f84b..9939b489 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala @@ -63,7 +63,7 @@ class SparseIndexSpecSpec extends AnyWordSpec { val recordExtractor = new TextRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, "")) - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false, + val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false, recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue) assert(indexes.length == 4) @@ -84,7 +84,7 @@ class SparseIndexSpecSpec extends AnyWordSpec { val recordExtractor = new TextFullRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, "")) - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false, + val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false, recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue) assert(indexes.length == 4) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala index e1d5a795..2b8201c1 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala @@ -213,7 +213,7 @@ class Test5MultisegmentSpec extends AnyFunSuite with SparkTestBase { val stream = new FileStreamer("../data/test5_data/COMP.DETAILS.SEP30.DATA.dat", FileSystem.get(new Configuration())) val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0) - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false, + val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false, recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = Some(10), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue) assert(indexes.length == 88) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test12MultiRootSparseIndex.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test12MultiRootSparseIndex.scala index 720f2b3e..2021599b 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test12MultiRootSparseIndex.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test12MultiRootSparseIndex.scala @@ -65,7 +65,7 @@ class Test12MultiRootSparseIndex extends AnyWordSpec with SparkTestBase with Bin val stream = new FileStreamer(tmpFileName, FileSystem.get(new Configuration())) val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwFixedLength, 3, 0, 0, 0) - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false, + val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false, recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = Some(4), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValues) assert(indexes.length == 3) @@ -81,7 +81,7 @@ class Test12MultiRootSparseIndex extends AnyWordSpec with SparkTestBase with Bin val stream = new FileStreamer(tmpFileName, FileSystem.get(new Configuration())) val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwFixedLength, 3, 0, 0, 0) - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false, + val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false, recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = Some(4), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValues) assert(indexes.length == 3)