From 10d188a0e7ec5f94c96fa55bb7c87c6c2fc4cbf4 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 9 Jul 2021 08:07:09 +0200 Subject: [PATCH] #397 Skip empty lines when reading record sequence text files --- .../extractors/raw/TextRecordExtractor.scala | 103 ++++++++----- .../cobol/reader/index/IndexGenerator.scala | 4 +- .../regression/Test13AsciiCrLfText.scala | 138 ++++++++++++++++++ 3 files changed, 208 insertions(+), 37 deletions(-) create mode 100644 spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test13AsciiCrLfText.scala diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala index 4ba5774e..b125608a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala @@ -26,78 +26,111 @@ import java.util */ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { private val maxRecordSize = ctx.copybook.getRecordSize + 2 - private val bytes = new Array[Byte](maxRecordSize) - private var bytesSize = 0 + private val pendingBytes = new Array[Byte](maxRecordSize) + private var pendingBytesSize = 0 + private var recordBytes: Option[Array[Byte]] = None + private var curRecordSize = 0 private var lastFooterSize = 1 - override def hasNext: Boolean = !ctx.inputStream.isEndOfStream || bytesSize > 0 + override def hasNext: Boolean = { + if (recordBytes.isEmpty) { + ensureBytesRead(maxRecordSize) + fetchNextRecord() + } + + recordBytes.get.length > 0 + } override def next(): Array[Byte] = { if (!hasNext) { throw new NoSuchElementException } - ensureBytesRead(maxRecordSize) - findEol() + val bytesToReturn = recordBytes.get + curRecordSize = 0 + recordBytes = None + bytesToReturn } - override def offset: Long = ctx.inputStream.offset - bytesSize + override def offset: Long = ctx.inputStream.offset - pendingBytesSize - curRecordSize + + private def ensureBytesRead(numOfBytes: Int): Unit = { + val bytesToRead = numOfBytes - pendingBytesSize + if (bytesToRead > 0) { + val newBytes = ctx.inputStream.next(bytesToRead) + if (newBytes.length > 0) { + System.arraycopy(newBytes, 0, pendingBytes, pendingBytesSize, newBytes.length) + pendingBytesSize = pendingBytesSize + newBytes.length + } + } + } - private def findEol(): Array[Byte] = { + private def skipEmptyLines(): Unit = { + var i = 0 + while (i < pendingBytesSize && (pendingBytes(i) == 0x0D || pendingBytes(i) == 0x0A)) { + i += 1 + } + if (i > 0) { + advanceArray(i) + ensureBytesRead(maxRecordSize) + } + } + + private def findNextNonEmptyRecord(): (Int, Int) = { var recordLength = 0 var recordPayload = 0 - var i = 0 - while (recordLength == 0 && i < bytesSize) { - if (bytes(i) == 0x0D) { - if (i + 1 < maxRecordSize && bytes(i + 1) == 0x0A) { + + while (recordLength == 0 && i < pendingBytesSize) { + if (pendingBytes(i) == 0x0D) { + if (i + 1 < maxRecordSize && pendingBytes(i + 1) == 0x0A) { recordLength = i + 2 recordPayload = i } - } else if (bytes(i) == 0x0A) { + } else if (pendingBytes(i) == 0x0A) { recordLength = i + 1 recordPayload = i } i += 1 } + (recordLength, recordPayload) + } - val record = if (recordLength > 0) { - bytes.take(recordPayload) + private def fetchNextRecord(): Unit = { + skipEmptyLines() + + var (recordLength, recordPayload) = findNextNonEmptyRecord() + + recordBytes = if (recordLength > 0) { + curRecordSize = recordLength + Some(pendingBytes.take(recordPayload)) } else { // Last record or a record is too large? // In the latter case if (ctx.inputStream.isEndOfStream) { // Last record - recordLength = bytesSize - recordPayload = bytesSize + recordLength = pendingBytesSize + recordPayload = pendingBytesSize } else { // This is an errors situation - no line breaks between records // Return a record worth of data minus line break. - recordLength = bytesSize - lastFooterSize - recordPayload = bytesSize - lastFooterSize + recordLength = pendingBytesSize - lastFooterSize + recordPayload = pendingBytesSize - lastFooterSize } - bytes.take(recordLength) - } - - if (bytesSize > recordLength) { - System.arraycopy(bytes, recordLength, bytes, 0, bytesSize - recordLength) + curRecordSize = recordLength + Some(pendingBytes.take(recordLength)) } - bytesSize -= recordLength - util.Arrays.fill(bytes, bytesSize, maxRecordSize, 0.toByte) + advanceArray(recordLength) lastFooterSize = recordLength - recordPayload - - record } - private def ensureBytesRead(numOfBytes: Int): Unit = { - val bytesToRead = numOfBytes - bytesSize - if (bytesToRead > 0) { - val newBytes = ctx.inputStream.next(bytesToRead) - if (newBytes.length > 0) { - System.arraycopy(newBytes, 0, bytes, bytesSize, newBytes.length) - bytesSize = numOfBytes - } + private def advanceArray(recordLength: Int): Unit = { + if (pendingBytesSize > recordLength) { + System.arraycopy(pendingBytes, recordLength, pendingBytes, 0, pendingBytesSize - recordLength) } + pendingBytesSize -= recordLength + + util.Arrays.fill(pendingBytes, pendingBytesSize, maxRecordSize, 0.toByte) } } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala index 0d446661..e843b05c 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala @@ -80,10 +80,10 @@ object IndexGenerator { record = dataStream.next(recordMetadata.recordLength) } val recordSize = dataStream.offset - byteIndex - val hasMoreRecords = recordSize > 0 + val hasMoreRecords = recordSize > 0 && !dataStream.isEndOfStream (recordSize, recordMetadata.isValid, hasMoreRecords) } - if (dataStream.isEndOfStream || !hasMoreRecords) { + if (!hasMoreRecords) { endOfFileReached = true } else { if (isValid) { diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test13AsciiCrLfText.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test13AsciiCrLfText.scala new file mode 100644 index 00000000..e8a87f86 --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test13AsciiCrLfText.scala @@ -0,0 +1,138 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.regression + +import org.scalatest.WordSpec +import org.slf4j.{Logger, LoggerFactory} +import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase} +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture + +class Test13AsciiCrLfText extends WordSpec with SparkTestBase with BinaryFileFixture with SimpleComparisonBase { + + private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass) + + private val copybook = + """ 01 ENTITY. + 05 A PIC X(2). + """ + + val binFileContents: Array[Byte] = Array[Byte]( + // 0 + 0x66.toByte, 0x64.toByte, 0x0D.toByte, 0x0A.toByte, + // 1 + 0x68.toByte, 0x64.toByte, 0x0D.toByte, 0x0A.toByte, + // 2 - empty line + 0x0D.toByte, 0x0A.toByte, + // 3 + 0x73.toByte, 0x64.toByte, 0x0D.toByte, 0x0A.toByte, + // 4 - empty line + 0x0D.toByte, 0x0A.toByte + ) + + val emptyFileContents: Array[Byte] = Array[Byte]( + // 0 - empty line + 0x0D.toByte, 0x0A.toByte, + // 1 - empty line + 0x0D.toByte, 0x0A.toByte + ) + + "Test ASCII CRLF text file" should { + "correctly identify empty lines when read as a text file" in { + withTempBinFile("crlf", ".dat", binFileContents) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("pedantic", "true") + .option("is_text", "true") + .option("encoding", "ascii") + .option("schema_retention_policy", "collapse_root") + .load(tmpFileName) + + val expected = """[{"A":"fd"},{"A":"hd"},{"A":"sd"}]""" + + val count = df.count() + val actual = df.toJSON.collect().mkString("[", ",", "]") + + assert(count == 3) + assertEqualsMultiline(actual, expected) + } + } + + "correctly identify empty lines when read as a record sequence" in { + withTempBinFile("crlf", ".dat", binFileContents) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("pedantic", "true") + .option("is_record_sequence", "true") + .option("is_text", "true") + .option("encoding", "ascii") + .option("schema_retention_policy", "collapse_root") + .load(tmpFileName) + + val expected = """[{"A":"fd"},{"A":"hd"},{"A":"sd"}]""" + + val count = df.count() + val actual = df.toJSON.collect().mkString("[", ",", "]") + + assert(count == 3) + assertEqualsMultiline(actual, expected) + } + } + } + + "Test empty ASCII CRLF text file" should { + "correctly identify empty lines when read as a text file" in { + withTempBinFile("crlf_empty", ".dat", emptyFileContents) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("pedantic", "true") + .option("is_text", "true") + .option("encoding", "ascii") + .option("schema_retention_policy", "collapse_root") + .load(tmpFileName) + + val count = df.count() + + assert(count == 0) + } + } + + "correctly identify empty lines when read as a record sequence" in { + withTempBinFile("crlf_empty", ".dat", emptyFileContents) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("pedantic", "true") + .option("is_record_sequence", "true") + .option("is_text", "true") + .option("encoding", "ascii") + .option("schema_retention_policy", "collapse_root") + .load(tmpFileName) + + val count = df.count() + + assert(count == 0) + } + } + } +}