Skip to content

Commit

Permalink
#397 Skip empty lines when reading record sequence text files
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Jul 13, 2021
1 parent 050be67 commit 10d188a
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,78 +26,111 @@ import java.util
*/
class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
private val maxRecordSize = ctx.copybook.getRecordSize + 2
private val bytes = new Array[Byte](maxRecordSize)
private var bytesSize = 0
private val pendingBytes = new Array[Byte](maxRecordSize)
private var pendingBytesSize = 0
private var recordBytes: Option[Array[Byte]] = None
private var curRecordSize = 0
private var lastFooterSize = 1

override def hasNext: Boolean = !ctx.inputStream.isEndOfStream || bytesSize > 0
override def hasNext: Boolean = {
if (recordBytes.isEmpty) {
ensureBytesRead(maxRecordSize)
fetchNextRecord()
}

recordBytes.get.length > 0
}

override def next(): Array[Byte] = {
if (!hasNext) {
throw new NoSuchElementException
}
ensureBytesRead(maxRecordSize)
findEol()
val bytesToReturn = recordBytes.get
curRecordSize = 0
recordBytes = None
bytesToReturn
}

override def offset: Long = ctx.inputStream.offset - bytesSize
override def offset: Long = ctx.inputStream.offset - pendingBytesSize - curRecordSize

private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - pendingBytesSize
if (bytesToRead > 0) {
val newBytes = ctx.inputStream.next(bytesToRead)
if (newBytes.length > 0) {
System.arraycopy(newBytes, 0, pendingBytes, pendingBytesSize, newBytes.length)
pendingBytesSize = pendingBytesSize + newBytes.length
}
}
}

private def findEol(): Array[Byte] = {
private def skipEmptyLines(): Unit = {
var i = 0
while (i < pendingBytesSize && (pendingBytes(i) == 0x0D || pendingBytes(i) == 0x0A)) {
i += 1
}
if (i > 0) {
advanceArray(i)
ensureBytesRead(maxRecordSize)
}
}

private def findNextNonEmptyRecord(): (Int, Int) = {
var recordLength = 0
var recordPayload = 0

var i = 0
while (recordLength == 0 && i < bytesSize) {
if (bytes(i) == 0x0D) {
if (i + 1 < maxRecordSize && bytes(i + 1) == 0x0A) {

while (recordLength == 0 && i < pendingBytesSize) {
if (pendingBytes(i) == 0x0D) {
if (i + 1 < maxRecordSize && pendingBytes(i + 1) == 0x0A) {
recordLength = i + 2
recordPayload = i
}
} else if (bytes(i) == 0x0A) {
} else if (pendingBytes(i) == 0x0A) {
recordLength = i + 1
recordPayload = i
}
i += 1
}
(recordLength, recordPayload)
}

val record = if (recordLength > 0) {
bytes.take(recordPayload)
private def fetchNextRecord(): Unit = {
skipEmptyLines()

var (recordLength, recordPayload) = findNextNonEmptyRecord()

recordBytes = if (recordLength > 0) {
curRecordSize = recordLength
Some(pendingBytes.take(recordPayload))
} else {
// Last record or a record is too large?
// In the latter case
if (ctx.inputStream.isEndOfStream) {
// Last record
recordLength = bytesSize
recordPayload = bytesSize
recordLength = pendingBytesSize
recordPayload = pendingBytesSize
} else {
// This is an errors situation - no line breaks between records
// Return a record worth of data minus line break.
recordLength = bytesSize - lastFooterSize
recordPayload = bytesSize - lastFooterSize
recordLength = pendingBytesSize - lastFooterSize
recordPayload = pendingBytesSize - lastFooterSize
}
bytes.take(recordLength)
}

if (bytesSize > recordLength) {
System.arraycopy(bytes, recordLength, bytes, 0, bytesSize - recordLength)
curRecordSize = recordLength
Some(pendingBytes.take(recordLength))
}
bytesSize -= recordLength

util.Arrays.fill(bytes, bytesSize, maxRecordSize, 0.toByte)
advanceArray(recordLength)

lastFooterSize = recordLength - recordPayload

record
}

private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - bytesSize
if (bytesToRead > 0) {
val newBytes = ctx.inputStream.next(bytesToRead)
if (newBytes.length > 0) {
System.arraycopy(newBytes, 0, bytes, bytesSize, newBytes.length)
bytesSize = numOfBytes
}
private def advanceArray(recordLength: Int): Unit = {
if (pendingBytesSize > recordLength) {
System.arraycopy(pendingBytes, recordLength, pendingBytes, 0, pendingBytesSize - recordLength)
}
pendingBytesSize -= recordLength

util.Arrays.fill(pendingBytes, pendingBytesSize, maxRecordSize, 0.toByte)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ object IndexGenerator {
record = dataStream.next(recordMetadata.recordLength)
}
val recordSize = dataStream.offset - byteIndex
val hasMoreRecords = recordSize > 0
val hasMoreRecords = recordSize > 0 && !dataStream.isEndOfStream
(recordSize, recordMetadata.isValid, hasMoreRecords)
}
if (dataStream.isEndOfStream || !hasMoreRecords) {
if (!hasMoreRecords) {
endOfFileReached = true
} else {
if (isValid) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.spark.cobol.source.regression

import org.scalatest.WordSpec
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase}
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture

class Test13AsciiCrLfText extends WordSpec with SparkTestBase with BinaryFileFixture with SimpleComparisonBase {

private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)

private val copybook =
""" 01 ENTITY.
05 A PIC X(2).
"""

val binFileContents: Array[Byte] = Array[Byte](
// 0
0x66.toByte, 0x64.toByte, 0x0D.toByte, 0x0A.toByte,
// 1
0x68.toByte, 0x64.toByte, 0x0D.toByte, 0x0A.toByte,
// 2 - empty line
0x0D.toByte, 0x0A.toByte,
// 3
0x73.toByte, 0x64.toByte, 0x0D.toByte, 0x0A.toByte,
// 4 - empty line
0x0D.toByte, 0x0A.toByte
)

val emptyFileContents: Array[Byte] = Array[Byte](
// 0 - empty line
0x0D.toByte, 0x0A.toByte,
// 1 - empty line
0x0D.toByte, 0x0A.toByte
)

"Test ASCII CRLF text file" should {
"correctly identify empty lines when read as a text file" in {
withTempBinFile("crlf", ".dat", binFileContents) { tmpFileName =>
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("pedantic", "true")
.option("is_text", "true")
.option("encoding", "ascii")
.option("schema_retention_policy", "collapse_root")
.load(tmpFileName)

val expected = """[{"A":"fd"},{"A":"hd"},{"A":"sd"}]"""

val count = df.count()
val actual = df.toJSON.collect().mkString("[", ",", "]")

assert(count == 3)
assertEqualsMultiline(actual, expected)
}
}

"correctly identify empty lines when read as a record sequence" in {
withTempBinFile("crlf", ".dat", binFileContents) { tmpFileName =>
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("pedantic", "true")
.option("is_record_sequence", "true")
.option("is_text", "true")
.option("encoding", "ascii")
.option("schema_retention_policy", "collapse_root")
.load(tmpFileName)

val expected = """[{"A":"fd"},{"A":"hd"},{"A":"sd"}]"""

val count = df.count()
val actual = df.toJSON.collect().mkString("[", ",", "]")

assert(count == 3)
assertEqualsMultiline(actual, expected)
}
}
}

"Test empty ASCII CRLF text file" should {
"correctly identify empty lines when read as a text file" in {
withTempBinFile("crlf_empty", ".dat", emptyFileContents) { tmpFileName =>
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("pedantic", "true")
.option("is_text", "true")
.option("encoding", "ascii")
.option("schema_retention_policy", "collapse_root")
.load(tmpFileName)

val count = df.count()

assert(count == 0)
}
}

"correctly identify empty lines when read as a record sequence" in {
withTempBinFile("crlf_empty", ".dat", emptyFileContents) { tmpFileName =>
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("pedantic", "true")
.option("is_record_sequence", "true")
.option("is_text", "true")
.option("encoding", "ascii")
.option("schema_retention_policy", "collapse_root")
.load(tmpFileName)

val count = df.count()

assert(count == 0)
}
}
}
}

0 comments on commit 10d188a

Please sign in to comment.