From 11ebb9f4f2eaa02517ea14409bd00de3b2ab5671 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 12 Jul 2021 08:47:31 +0200 Subject: [PATCH] #397 Optimize the text record extractor for performance. --- .../extractors/raw/TextRecordExtractor.scala | 73 +++++++++++++------ 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala index b125608a..51f080e1 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala @@ -23,36 +23,50 @@ import java.util * * Record extractors are used for in situations where the size of records in a file is not fixed and cannot be * determined neither from the copybook nor from record headers. + * + * Empty likes (ones that contain only LF / CRLF) are skipped. + * + * The implementation is optimized for performance, so might be not obviously readable. + * Hopefully, comments will help anyone reading this. */ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + // Maximum possible record size is the size of the copybook record + maximum size of the delimiter (2 characters for CRLF). private val maxRecordSize = ctx.copybook.getRecordSize + 2 + + // This is the buffer to keep the part of the stream that will be split by records. + // The size of the array is always the maximum record size. The number of bytes that contain useful payload is specified + // in pendingBytesSize. private val pendingBytes = new Array[Byte](maxRecordSize) private var pendingBytesSize = 0 - private var recordBytes: Option[Array[Byte]] = None + + // If true, curRecordSize and curPayloadSize point to a record, otherwise the next record needs to be found + private var isRawRecordFound = false + // The number of bytes from pendingBytes that correspond to a record, including line break character(s) private var curRecordSize = 0 - private var lastFooterSize = 1 + // The number of bytes from pendingBytes that correspond to a record, without line break character(s) + private var curPayloadSize = 0 + // The number of bytes the line breaking character has taken for the last record. Can only be 1 (LF) or 2 (CR LF). + private var lastLineBreakSize = 1 override def hasNext: Boolean = { - if (recordBytes.isEmpty) { + if (!isRawRecordFound) { ensureBytesRead(maxRecordSize) - fetchNextRecord() + findNextRecord() } - recordBytes.get.length > 0 + curRecordSize > 0 } override def next(): Array[Byte] = { if (!hasNext) { throw new NoSuchElementException } - val bytesToReturn = recordBytes.get - curRecordSize = 0 - recordBytes = None - bytesToReturn + fetchNextRecord() } override def offset: Long = ctx.inputStream.offset - pendingBytesSize - curRecordSize + // This method ensures that pendingBytes contains the specified number of bytes read from the input stream private def ensureBytesRead(numOfBytes: Int): Unit = { val bytesToRead = numOfBytes - pendingBytesSize if (bytesToRead > 0) { @@ -64,6 +78,7 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe } } + // This method skips empty lines, by ignoring lines that begin from CR / LF private def skipEmptyLines(): Unit = { var i = 0 while (i < pendingBytesSize && (pendingBytes(i) == 0x0D || pendingBytes(i) == 0x0A)) { @@ -75,6 +90,9 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe } } + // This method finds the location of the end of the next record by searching for line ending characters + // The data in pendingBytes is expected to be the length of maxRecordSize, or can be smaller for the last + // record in the file private def findNextNonEmptyRecord(): (Int, Int) = { var recordLength = 0 var recordPayload = 0 @@ -95,36 +113,49 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe (recordLength, recordPayload) } - private def fetchNextRecord(): Unit = { + // This method finds the location of the end of the next record and adjusts curRecordSize and curPayloadSize + // so that the next record can be fetched. Skips empty lines. + private def findNextRecord(): Unit = { skipEmptyLines() - var (recordLength, recordPayload) = findNextNonEmptyRecord() + val (recordLength, recordPayload) = findNextNonEmptyRecord() - recordBytes = if (recordLength > 0) { + if (recordLength > 0) { curRecordSize = recordLength - Some(pendingBytes.take(recordPayload)) + curPayloadSize = recordPayload } else { // Last record or a record is too large? // In the latter case if (ctx.inputStream.isEndOfStream) { // Last record - recordLength = pendingBytesSize - recordPayload = pendingBytesSize + curRecordSize = pendingBytesSize + curPayloadSize = pendingBytesSize } else { // This is an errors situation - no line breaks between records // Return a record worth of data minus line break. - recordLength = pendingBytesSize - lastFooterSize - recordPayload = pendingBytesSize - lastFooterSize + curRecordSize = pendingBytesSize - lastLineBreakSize + curPayloadSize = pendingBytesSize - lastLineBreakSize } - curRecordSize = recordLength - Some(pendingBytes.take(recordLength)) } - advanceArray(recordLength) + isRawRecordFound = true + + lastLineBreakSize = recordLength - recordPayload + } - lastFooterSize = recordLength - recordPayload + // This method extracts the current record from the buffer array. + // It should only be called when curRecordSize and curPayloadSize are set properly. + private def fetchNextRecord(): Array[Byte] = { + val bytes = pendingBytes.take(curPayloadSize) + advanceArray(curRecordSize) + isRawRecordFound = false + curPayloadSize = 0 + curRecordSize = 0 + bytes } + // This method shifts the internal buffer pendingBytes to the left by the size of the record. + // It also fills the rest of the array with 0x0 character. private def advanceArray(recordLength: Int): Unit = { if (pendingBytesSize > recordLength) { System.arraycopy(pendingBytes, recordLength, pendingBytes, 0, pendingBytesSize - recordLength)