Skip to content

Commit

Permalink
#614 Generate an error when the custom record extractor does not conf…
Browse files Browse the repository at this point in the history
…orm to the expectations.
  • Loading branch information
yruslan committed Apr 28, 2023
1 parent 73184d4 commit 83d94e9
Showing 1 changed file with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,36 @@ object IndexGenerator extends Logging {
def sparseIndexGenerator(fileId: Int,
dataStream: SimpleStream,
fileStartOffset: Long,
isRdwBigEndian: Boolean,
recordHeaderParser: RecordHeaderParser,
recordExtractor: Option[RawRecordExtractor],
recordsPerIndexEntry: Option[Int] = None,
sizePerIndexEntryMB: Option[Int] = None,
copybook: Option[Copybook] = None,
segmentField: Option[Primitive] = None,
recordsPerIndexEntry: Option[Int],
sizePerIndexEntryMB: Option[Int],
copybook: Option[Copybook],
segmentField: Option[Primitive],
isHierarchical: Boolean,
rootSegmentId: String = ""): ArrayBuffer[SparseIndexEntry] = {
val rootSegmentIds = rootSegmentId.split(',').toList


var byteIndex = fileStartOffset
val index = new ArrayBuffer[SparseIndexEntry]
var rootRecordId: String = ""
var recordsInChunk = 0
var bytesInChunk = 0L
var bytesInChunk = fileStartOffset
var recordIndex = 0
var recordsInChunk = 0

val rootSegmentIds = rootSegmentId.split(',').toList
var rootRecordId: String = ""
val isReallyHierarchical = copybook.nonEmpty && segmentField.nonEmpty && isHierarchical
val isSplitBySize = recordsPerIndexEntry.isEmpty && sizePerIndexEntryMB.nonEmpty

val needSplit = getSplitCondition(recordsPerIndexEntry, sizePerIndexEntryMB)

// Add the first mandatory index entry
val index = new ArrayBuffer[SparseIndexEntry]
val indexEntry = SparseIndexEntry(fileStartOffset, -1, fileId, recordIndex)
index += indexEntry

if (dataStream.offset != fileStartOffset && recordExtractor.isDefined) {
logger.warn("The record extractor has returned the offset that is not the beginning of the file. " +
throw new IllegalStateException("The record extractor has returned the offset that is not the beginning of the file. " +
s"Expected: $fileStartOffset. Got: ${dataStream.offset}. File: ${dataStream.inputFileName}. " +
"It will be assumed that the offset is shifted by 1 record, but if you have record id inconsistency, " +
"please fix the record extractor.")
recordIndex += 1
"Make sure 'offset()' points to the record that is going to be returned with next().")
}

var endOfFileReached = false
Expand Down

0 comments on commit 83d94e9

Please sign in to comment.