Skip to content

Commit

Permalink
#397 Optimize the text record extractor for performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Jul 12, 2021
1 parent 8602bfb commit 11ebb9f
Showing 1 changed file with 52 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,50 @@ import java.util
*
* Record extractors are used for in situations where the size of records in a file is not fixed and cannot be
* determined neither from the copybook nor from record headers.
*
* Empty likes (ones that contain only LF / CRLF) are skipped.
*
* The implementation is optimized for performance, so might be not obviously readable.
* Hopefully, comments will help anyone reading this.
*/
class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
// Maximum possible record size is the size of the copybook record + maximum size of the delimiter (2 characters for CRLF).
private val maxRecordSize = ctx.copybook.getRecordSize + 2

// This is the buffer to keep the part of the stream that will be split by records.
// The size of the array is always the maximum record size. The number of bytes that contain useful payload is specified
// in pendingBytesSize.
private val pendingBytes = new Array[Byte](maxRecordSize)
private var pendingBytesSize = 0
private var recordBytes: Option[Array[Byte]] = None

// If true, curRecordSize and curPayloadSize point to a record, otherwise the next record needs to be found
private var isRawRecordFound = false
// The number of bytes from pendingBytes that correspond to a record, including line break character(s)
private var curRecordSize = 0
private var lastFooterSize = 1
// The number of bytes from pendingBytes that correspond to a record, without line break character(s)
private var curPayloadSize = 0
// The number of bytes the line breaking character has taken for the last record. Can only be 1 (LF) or 2 (CR LF).
private var lastLineBreakSize = 1

override def hasNext: Boolean = {
if (recordBytes.isEmpty) {
if (!isRawRecordFound) {
ensureBytesRead(maxRecordSize)
fetchNextRecord()
findNextRecord()
}

recordBytes.get.length > 0
curRecordSize > 0
}

override def next(): Array[Byte] = {
if (!hasNext) {
throw new NoSuchElementException
}
val bytesToReturn = recordBytes.get
curRecordSize = 0
recordBytes = None
bytesToReturn
fetchNextRecord()
}

override def offset: Long = ctx.inputStream.offset - pendingBytesSize - curRecordSize

// This method ensures that pendingBytes contains the specified number of bytes read from the input stream
private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - pendingBytesSize
if (bytesToRead > 0) {
Expand All @@ -64,6 +78,7 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
}
}

// This method skips empty lines, by ignoring lines that begin from CR / LF
private def skipEmptyLines(): Unit = {
var i = 0
while (i < pendingBytesSize && (pendingBytes(i) == 0x0D || pendingBytes(i) == 0x0A)) {
Expand All @@ -75,6 +90,9 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
}
}

// This method finds the location of the end of the next record by searching for line ending characters
// The data in pendingBytes is expected to be the length of maxRecordSize, or can be smaller for the last
// record in the file
private def findNextNonEmptyRecord(): (Int, Int) = {
var recordLength = 0
var recordPayload = 0
Expand All @@ -95,36 +113,49 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
(recordLength, recordPayload)
}

private def fetchNextRecord(): Unit = {
// This method finds the location of the end of the next record and adjusts curRecordSize and curPayloadSize
// so that the next record can be fetched. Skips empty lines.
private def findNextRecord(): Unit = {
skipEmptyLines()

var (recordLength, recordPayload) = findNextNonEmptyRecord()
val (recordLength, recordPayload) = findNextNonEmptyRecord()

recordBytes = if (recordLength > 0) {
if (recordLength > 0) {
curRecordSize = recordLength
Some(pendingBytes.take(recordPayload))
curPayloadSize = recordPayload
} else {
// Last record or a record is too large?
// In the latter case
if (ctx.inputStream.isEndOfStream) {
// Last record
recordLength = pendingBytesSize
recordPayload = pendingBytesSize
curRecordSize = pendingBytesSize
curPayloadSize = pendingBytesSize
} else {
// This is an errors situation - no line breaks between records
// Return a record worth of data minus line break.
recordLength = pendingBytesSize - lastFooterSize
recordPayload = pendingBytesSize - lastFooterSize
curRecordSize = pendingBytesSize - lastLineBreakSize
curPayloadSize = pendingBytesSize - lastLineBreakSize
}
curRecordSize = recordLength
Some(pendingBytes.take(recordLength))
}

advanceArray(recordLength)
isRawRecordFound = true

lastLineBreakSize = recordLength - recordPayload
}

lastFooterSize = recordLength - recordPayload
// This method extracts the current record from the buffer array.
// It should only be called when curRecordSize and curPayloadSize are set properly.
private def fetchNextRecord(): Array[Byte] = {
val bytes = pendingBytes.take(curPayloadSize)
advanceArray(curRecordSize)
isRawRecordFound = false
curPayloadSize = 0
curRecordSize = 0
bytes
}

// This method shifts the internal buffer pendingBytes to the left by the size of the record.
// It also fills the rest of the array with 0x0 character.
private def advanceArray(recordLength: Int): Unit = {
if (pendingBytesSize > recordLength) {
System.arraycopy(pendingBytes, recordLength, pendingBytes, 0, pendingBytesSize - recordLength)
Expand Down

0 comments on commit 11ebb9f

Please sign in to comment.