Skip to content

Commit

Permalink
#620 Fix backwards compatibility of custom record parsers.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed May 5, 2023
1 parent a7d3718 commit f24b932
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockPa
private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize)
private val bdwSize = fbParams.blockLength.orElse(fbParams.recordsPerBlock.map(_ * recordSize))

override def offset: Long = ctx.dataStream.offset
override def offset: Long = ctx.inputStream.offset

override def hasNext: Boolean = {
if (recordQueue.isEmpty) {
Expand All @@ -36,17 +36,17 @@ class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockPa
}

private def readNextBlock(): Unit = {
if (!ctx.dataStream.isEndOfStream) {
var bdwOffset = ctx.dataStream.offset
if (!ctx.inputStream.isEndOfStream) {
var bdwOffset = ctx.inputStream.offset

val nextBlockSize = bdwSize.getOrElse({
val bdw = ctx.dataStream.next(ctx.bdwDecoder.headerSize)
val bdw = ctx.inputStream.next(ctx.bdwDecoder.headerSize)
val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset)
bdwOffset += ctx.bdwDecoder.headerSize
blockLength
})

val blockBuffer = ctx.dataStream.next(nextBlockSize)
val blockBuffer = ctx.inputStream.next(nextBlockSize)

var blockIndex = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import za.co.absa.cobrix.cobol.reader.stream.SimpleStream

/**
* @param startingRecordNumber A record number the input stream is pointing to (zero-based).
* @param dataStream An input stream pointing to the beginning of a file or a record in a file. The
* @param inputStream An input stream pointing to the beginning of a file or a record in a file. The
* record extractor should close the stream when the end of file is reached.
* @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing
* to a record in the middle. The record extractor should close the stream when it
Expand All @@ -32,7 +32,7 @@ import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
*/
case class RawRecordContext(
startingRecordNumber: Long,
dataStream: SimpleStream,
inputStream: SimpleStream,
headerStream: SimpleStream,
copybook: Copybook,
rdwDecoder: RecordHeaderDecoder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with R
fetchNextRecord()
}

override def offset: Long = ctx.dataStream.offset - pendingBytesSize
override def offset: Long = ctx.inputStream.offset - pendingBytesSize

// This method ensures that pendingBytes contains the specified number of bytes read from the input stream
private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - pendingBytesSize
if (bytesToRead > 0) {
val newBytes = ctx.dataStream.next(bytesToRead)
val newBytes = ctx.inputStream.next(bytesToRead)
if (newBytes.length > 0) {
System.arraycopy(newBytes, 0, pendingBytes, pendingBytesSize, newBytes.length)
pendingBytesSize = pendingBytesSize + newBytes.length
Expand Down Expand Up @@ -135,7 +135,7 @@ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with R
System.arraycopy(pendingBytes, recordLength + i, pendingBytes, recordLength, size - i)
pendingBytesSize -= i
}
endOfStream = ctx.dataStream.isEndOfStream
endOfStream = ctx.inputStream.isEndOfStream
if (!found && !endOfStream) {
ensureBytesRead(maxRecordSize)
}
Expand All @@ -155,7 +155,7 @@ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with R
} else {
// Last record or a record is too large?
// In the latter case
if (pendingBytesSize <= recordSize && ctx.dataStream.isEndOfStream) {
if (pendingBytesSize <= recordSize && ctx.inputStream.isEndOfStream) {
// Last record
curRecordSize = pendingBytesSize
curPayloadSize = pendingBytesSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
fetchNextRecord()
}

override def offset: Long = ctx.dataStream.offset - pendingBytesSize
override def offset: Long = ctx.inputStream.offset - pendingBytesSize

// This method ensures that pendingBytes contains the specified number of bytes read from the input stream
private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - pendingBytesSize
if (bytesToRead > 0) {
val newBytes = ctx.dataStream.next(bytesToRead)
val newBytes = ctx.inputStream.next(bytesToRead)
if (newBytes.length > 0) {
System.arraycopy(newBytes, 0, pendingBytes, pendingBytesSize, newBytes.length)
pendingBytesSize = pendingBytesSize + newBytes.length
Expand Down Expand Up @@ -128,7 +128,7 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
} else {
// Last record or a record is too large?
// In the latter case
if (pendingBytesSize <= recordSize && ctx.dataStream.isEndOfStream) {
if (pendingBytesSize <= recordSize && ctx.inputStream.isEndOfStream) {
// Last record
curRecordSize = pendingBytesSize
curPayloadSize = pendingBytesSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ class VarOccursRecordExtractor(ctx: RawRecordContext) extends Serializable with
private val bytes = new Array[Byte](maxRecordSize)
private var bytesSize = 0

override def hasNext: Boolean = ctx.dataStream.offset < ctx.dataStream.size
override def hasNext: Boolean = ctx.inputStream.offset < ctx.inputStream.size

override def next(): Array[Byte] = {
if (hasVarSizeOccurs) {
bytesSize = 0
util.Arrays.fill(bytes, 0.toByte)
extractVarOccursRecordBytes()
} else {
ctx.dataStream.next(maxRecordSize)
ctx.inputStream.next(maxRecordSize)
}
}

def offset: Long = ctx.dataStream.offset
def offset: Long = ctx.inputStream.offset

private def extractVarOccursRecordBytes(): Array[Byte] = {
val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]]
Expand Down Expand Up @@ -140,7 +140,7 @@ class VarOccursRecordExtractor(ctx: RawRecordContext) extends Serializable with
private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - bytesSize
if (bytesToRead > 0) {
val newBytes = ctx.dataStream.next(bytesToRead)
val newBytes = ctx.inputStream.next(bytesToRead)
if (newBytes.length > 0) {
System.arraycopy(newBytes, 0, bytes, bytesSize, newBytes.length)
bytesSize = numOfBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class VariableBlockVariableRecordExtractor(ctx: RawRecordContext) extends Serial

private val recordQueue = new mutable.Queue[Array[Byte]]
private var canSplitAtCurrentOffset = true
private var recordOffset: Long = ctx.dataStream.offset
private var recordOffset: Long = ctx.inputStream.offset

override def offset: Long = recordOffset

Expand All @@ -40,12 +40,12 @@ class VariableBlockVariableRecordExtractor(ctx: RawRecordContext) extends Serial
val bdwSize = ctx.bdwDecoder.headerSize
val rdwSize = ctx.rdwDecoder.headerSize

if (!ctx.dataStream.isEndOfStream) {
val bdwOffset = ctx.dataStream.offset
val bdw = ctx.dataStream.next(bdwSize)
if (!ctx.inputStream.isEndOfStream) {
val bdwOffset = ctx.inputStream.offset
val bdw = ctx.inputStream.next(bdwSize)

val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset)
val blockBuffer = ctx.dataStream.next(blockLength)
val blockBuffer = ctx.inputStream.next(blockLength)

var blockIndex = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ object IndexGenerator extends Logging {
val indexEntry = SparseIndexEntry(fileStartOffset, -1, fileId, recordIndex)
index += indexEntry

if (dataStream.offset != fileStartOffset && recordExtractor.isDefined) {
throw new IllegalStateException("The record extractor has returned the offset that is not the beginning of the file. " +
s"Expected: $fileStartOffset. Got: ${dataStream.offset}. File: ${dataStream.inputFileName}. " +
"Make sure 'offset()' points to the record that is going to be returned with next().")
}
recordExtractor.foreach(extractor => {
if (extractor.offset != fileStartOffset) {
throw new IllegalStateException("The record extractor has returned the offset that is not the beginning of the file. " +
s"Expected: $fileStartOffset. Got: ${extractor.offset}. File: ${dataStream.inputFileName}. " +
"Make sure 'offset()' points to the record that is going to be returned with next().")
}
})

var endOfFileReached = false
while (!endOfFileReached) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package za.co.absa.cobrix.cobol.mock
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor}

class RecordExtractorMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
override def offset: Long = ctx.dataStream.offset
override def offset: Long = ctx.inputStream.offset

override def hasNext: Boolean = !ctx.dataStream.isEndOfStream
override def hasNext: Boolean = !ctx.inputStream.isEndOfStream

override def next(): Array[Byte] = {
val header = ctx.dataStream.next(2)
val header = ctx.inputStream.next(2)
if (header.length == 2) {
ctx.dataStream.next(header.head + header(1) * 256)
ctx.inputStream.next(header.head + header(1) * 256)
} else {
Array.empty[Byte]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package za.co.absa.cobrix.cobol.mock
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor}

class RecordExtractorReadAhaedMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
ctx.dataStream.next(2)
ctx.inputStream.next(2)

override def offset: Long = ctx.dataStream.offset
override def offset: Long = ctx.inputStream.offset

override def hasNext: Boolean = !ctx.dataStream.isEndOfStream
override def hasNext: Boolean = !ctx.inputStream.isEndOfStream

override def next(): Array[Byte] = {
val header = ctx.dataStream.next(2)
val header = ctx.inputStream.next(2)
if (header.length == 2) {
ctx.dataStream.next(header.head + header(1) * 256)
ctx.inputStream.next(header.head + header(1) * 256)
} else {
Array.empty[Byte]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class CustomRecordExtractorMock(ctx: RawRecordContext) extends Serializable with

private var recordNumber = ctx.startingRecordNumber

override def offset: Long = ctx.dataStream.offset
override def offset: Long = ctx.inputStream.offset

override def hasNext: Boolean = !ctx.dataStream.isEndOfStream
override def hasNext: Boolean = !ctx.inputStream.isEndOfStream

@throws[NoSuchElementException]
override def next(): Array[Byte] = {
Expand All @@ -40,9 +40,9 @@ class CustomRecordExtractorMock(ctx: RawRecordContext) extends Serializable with
}

val rawRecord = if (recordNumber % 2 == 0) {
ctx.dataStream.next(2)
ctx.inputStream.next(2)
} else {
ctx.dataStream.next(3)
ctx.inputStream.next(3)
}

recordNumber += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ class CustomRecordExtractorWithFileHeaderMock(ctx: RawRecordContext) extends Ser

ctx.headerStream.close()

override def offset: Long = ctx.dataStream.offset
override def offset: Long = ctx.inputStream.offset

override def hasNext: Boolean = !ctx.dataStream.isEndOfStream
override def hasNext: Boolean = !ctx.inputStream.isEndOfStream

@throws[NoSuchElementException]
override def next(): Array[Byte] = {
if (!hasNext) {
throw new NoSuchElementException
}

val rawRecord = ctx.dataStream.next(recordSize)
val rawRecord = ctx.inputStream.next(recordSize)

if (rawRecord.length != recordSize || ctx.dataStream.isEndOfStream) {
ctx.dataStream.close()
if (rawRecord.length != recordSize || ctx.inputStream.isEndOfStream) {
ctx.inputStream.close()
}

recordNumber += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Test26CustomRecordExtractor extends AnyWordSpec with SparkTestBase with Bi

assert(actual == expected)
assert(CustomRecordExtractorMock.additionalInfo == "re info")
assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.dataStream)
assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.inputStream)
}
}

Expand All @@ -57,7 +57,7 @@ class Test26CustomRecordExtractor extends AnyWordSpec with SparkTestBase with Bi

assert(actual == expected)
assert(CustomRecordExtractorMock.additionalInfo == "re info")
assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.dataStream)
assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.inputStream)
}
}

Expand All @@ -71,7 +71,7 @@ class Test26CustomRecordExtractor extends AnyWordSpec with SparkTestBase with Bi

assert(actual == expected)
assert(CustomRecordExtractorMock.additionalInfo == "re info")
assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.dataStream)
assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.inputStream)
}
}

Expand Down

0 comments on commit f24b932

Please sign in to comment.