Skip to content

Commit

Permalink
#614 Fix index handling of custom record parsers that read from the i…
Browse files Browse the repository at this point in the history
…nput stream in the constructor.
  • Loading branch information
yruslan committed Apr 27, 2023
1 parent 186494e commit eaebb64
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
segmentIdField match {
case Some(field) => IndexGenerator.sparseIndexGenerator(fileNumber,
binaryData,
readerProperties.fileStartOffset,
isRdwBigEndian,
recordHeaderParser,
recordExtractor(0L, binaryData, copybook),
Expand All @@ -186,6 +187,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
segmentIdValue)
case None => IndexGenerator.sparseIndexGenerator(fileNumber,
binaryData,
readerProperties.fileStartOffset,
isRdwBigEndian,
recordHeaderParser,
recordExtractor(0L, binaryData, copybook),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ object IndexGenerator extends Logging {

def sparseIndexGenerator(fileId: Int,
dataStream: SimpleStream,
fileStartOffset: Long,
isRdwBigEndian: Boolean,
recordHeaderParser: RecordHeaderParser,
recordExtractor: Option[RawRecordExtractor],
Expand All @@ -41,7 +42,7 @@ object IndexGenerator extends Logging {
isHierarchical: Boolean,
rootSegmentId: String = ""): ArrayBuffer[SparseIndexEntry] = {
val rootSegmentIds = rootSegmentId.split(',').toList
var byteIndex = 0L
var byteIndex = fileStartOffset
val index = new ArrayBuffer[SparseIndexEntry]
var rootRecordId: String = ""
var recordsInChunk = 0
Expand All @@ -53,15 +54,15 @@ object IndexGenerator extends Logging {
val needSplit = getSplitCondition(recordsPerIndexEntry, sizePerIndexEntryMB)

// Add the first mandatory index entry
val indexEntry = SparseIndexEntry(dataStream.offset, -1, fileId, recordIndex)
val indexEntry = SparseIndexEntry(fileStartOffset, -1, fileId, recordIndex)
index += indexEntry

var endOfFileReached = false
while (!endOfFileReached) {
var record: Array[Byte] = null
val (recordSize: Long, isValid, hasMoreRecords, canSplit) = recordExtractor match {
case Some(extractor) =>
val offset0 = extractor.offset
val offset0 = byteIndex
val canSplit = extractor.canSplitHere
val isValid = if (extractor.hasNext) {
record = extractor.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SparseIndexSpecSpec extends AnyWordSpec {

val recordExtractor = new TextRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, ""))

val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue)
assert(indexes.length == 4)
Expand All @@ -84,7 +84,7 @@ class SparseIndexSpecSpec extends AnyWordSpec {

val recordExtractor = new TextFullRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, ""))

val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue)
assert(indexes.length == 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class Test5MultisegmentSpec extends AnyFunSuite with SparkTestBase {
val stream = new FileStreamer("../data/test5_data/COMP.DETAILS.SEP30.DATA.dat", FileSystem.get(new Configuration()))

val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0)
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = Some(10), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue)
assert(indexes.length == 88)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Test12MultiRootSparseIndex extends AnyWordSpec with SparkTestBase with Bin
val stream = new FileStreamer(tmpFileName, FileSystem.get(new Configuration()))

val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwFixedLength, 3, 0, 0, 0)
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = Some(4), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValues)
assert(indexes.length == 3)
Expand All @@ -81,7 +81,7 @@ class Test12MultiRootSparseIndex extends AnyWordSpec with SparkTestBase with Bin
val stream = new FileStreamer(tmpFileName, FileSystem.get(new Configuration()))

val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwFixedLength, 3, 0, 0, 0)
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = Some(4), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValues)
assert(indexes.length == 3)
Expand Down

0 comments on commit eaebb64

Please sign in to comment.