Skip to content

Commit

Permalink
#484 Initial implementation of text record extractor with partial rec…
Browse files Browse the repository at this point in the history
…ords disallowed.
  • Loading branch information
yruslan committed Mar 29, 2022
1 parent df9d579 commit f8f7c8b
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC}
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, VariableBlock}
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedBlockParameters, FixedBlockRawRecordExtractor, RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedBlockParameters, FixedBlockRawRecordExtractor, RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextFullRecordExtractor, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
import za.co.absa.cobrix.cobol.reader.index.IndexGenerator
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
Expand Down Expand Up @@ -75,8 +75,10 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
readerProperties.recordExtractor match {
case Some(recordExtractorClass) =>
Some(RawRecordExtractorFactory.createRecordHeaderParser(recordExtractorClass, reParams))
case None if readerProperties.isText =>
case None if readerProperties.isText && readerProperties.allowPartialRecords =>
Some(new TextRecordExtractor(reParams))
case None if readerProperties.isText =>
Some(new TextFullRecordExtractor(reParams))
case None if readerProperties.recordFormat == FixedBlock =>
val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock)
FixedBlockParameters.validate(fbParams)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.reader.extractors.raw

import java.util

/**
* This implementation of a record extractor for ASCII test files.
*
* Record extractors are used for in situations where the size of records in a file is not fixed and cannot be
* determined neither from the copybook nor from record headers.
*
* Empty lines (ones that contain only LF / CRLF) are skipped.
*
* The implementation is optimized for performance, so might be not obviously readable.
* Hopefully, comments will help anyone reading this.
*/
class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
private val recordSize = ctx.copybook.getRecordSize

// Maximum possible record size is the size of the copybook record + maximum size of the delimiter (2 characters for CRLF).
private val maxRecordSize = recordSize * 2 + 2

// This is the buffer to keep the part of the stream that will be split by records.
// The size of the array is always the maximum record size. The number of bytes that contain useful payload is specified
// in pendingBytesSize.
private val pendingBytes = new Array[Byte](maxRecordSize)
private var pendingBytesSize = 0

// If true, curRecordSize and curPayloadSize point to a record, otherwise the next record needs to be found
private var isRawRecordFound = false
// The number of bytes from pendingBytes that correspond to a record, including line break character(s)
private var curRecordSize = 0
// The number of bytes from pendingBytes that correspond to a record, without line break character(s)
private var curPayloadSize = 0

override def hasNext: Boolean = {
if (!isRawRecordFound) {
ensureBytesRead(maxRecordSize)
findNextRecord()
}

curRecordSize > 0
}

override def next(): Array[Byte] = {
if (!hasNext) {
throw new NoSuchElementException
}
fetchNextRecord()
}

override def offset: Long = ctx.inputStream.offset - pendingBytesSize

// This method ensures that pendingBytes contains the specified number of bytes read from the input stream
private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - pendingBytesSize
if (bytesToRead > 0) {
val newBytes = ctx.inputStream.next(bytesToRead)
if (newBytes.length > 0) {
System.arraycopy(newBytes, 0, pendingBytes, pendingBytesSize, newBytes.length)
pendingBytesSize = pendingBytesSize + newBytes.length
}
}
}

// This method skips empty lines, by ignoring lines that begin from CR / LF
private def skipEmptyLines(): Unit = {
var i = 0
while (i < pendingBytesSize && (pendingBytes(i) == 0x0D || pendingBytes(i) == 0x0A)) {
i += 1
}
if (i > 0) {
advanceArray(i)
ensureBytesRead(maxRecordSize)
}
}

// This method finds the location of the end of the next record by searching for line ending characters
// The data in pendingBytes is expected to be the length of maxRecordSize, or can be smaller for the last
// record in the file
private def findNextNonEmptyRecord(): (Int, Int) = {
var recordLength = 0
var recordPayload = 0
var i = 0

while (recordLength == 0 && i < pendingBytesSize) {
if (pendingBytes(i) == 0x0D) {
if (i + 1 < maxRecordSize && pendingBytes(i + 1) == 0x0A) {
recordLength = i + 2
recordPayload = i
}
} else if (pendingBytes(i) == 0x0A) {
recordLength = i + 1
recordPayload = i
}
i += 1
}
(recordLength, recordPayload)
}

private def findNextLineBreak(recordLength: Int): Unit = {
var found = false
var endOfStream = false

while (!found && !endOfStream) {
val start = recordLength
val size = pendingBytesSize - recordLength
var i = 0

while (!found && i < size) {
if (pendingBytes(start + i) == 0x0D || pendingBytes(start + i) == 0x0A) {
found = true
} else {
i += 1
}
}
if (i > 0) {
System.arraycopy(pendingBytes, recordLength + i, pendingBytes, recordLength, size - i)
pendingBytesSize -= i
}
endOfStream = ctx.inputStream.isEndOfStream
if (!found && !endOfStream) {
ensureBytesRead(maxRecordSize)
}
}
}

// This method finds the location of the end of the next record and adjusts curRecordSize and curPayloadSize
// so that the next record can be fetched. Skips empty lines.
private def findNextRecord(): Unit = {
skipEmptyLines()

val (recordLength, recordPayload) = findNextNonEmptyRecord()

if (recordLength > 0) {
curRecordSize = recordLength
curPayloadSize = recordPayload
} else {
// Last record or a record is too large?
// In the latter case
if (pendingBytesSize <= recordSize && ctx.inputStream.isEndOfStream) {
// Last record
curRecordSize = pendingBytesSize
curPayloadSize = pendingBytesSize
} else {
// This is an errors situation - no line breaks between records
// Return a record worth of data.
curRecordSize = recordSize
curPayloadSize = recordSize

findNextLineBreak(recordSize)
}
}

isRawRecordFound = true
}

// This method extracts the current record from the buffer array.
// It should only be called when curRecordSize and curPayloadSize are set properly.
private def fetchNextRecord(): Array[Byte] = {
val bytes = pendingBytes.take(curPayloadSize)
advanceArray(curRecordSize)
isRawRecordFound = false
curPayloadSize = 0
curRecordSize = 0
bytes
}

// This method shifts the internal buffer pendingBytes to the left by the size of the record.
// It also fills the rest of the array with 0x0 character.
private def advanceArray(recordLength: Int): Unit = {
if (pendingBytesSize > recordLength) {
System.arraycopy(pendingBytes, recordLength, pendingBytes, 0, pendingBytesSize - recordLength)
}
pendingBytesSize -= recordLength

util.Arrays.fill(pendingBytes, pendingBytesSize, maxRecordSize, 0.toByte)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util
* Record extractors are used for in situations where the size of records in a file is not fixed and cannot be
* determined neither from the copybook nor from record headers.
*
* Empty likes (ones that contain only LF / CRLF) are skipped.
* Empty lines (ones that contain only LF / CRLF) are skipped.
*
* The implementation is optimized for performance, so might be not obviously readable.
* Hopefully, comments will help anyone reading this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param variableLengthParams VariableLengthParameters containing the specifications for the consumption of variable-length Cobol records.
* @param schemaRetentionPolicy A copybook usually has a root group struct element that acts like a rowtag in XML. This can be retained in Spark schema or can be collapsed
* @param stringTrimmingPolicy Specify if and how strings should be trimmed when parsed
* @param allowPartialRecords If true, partial ASCII records can be parsed (in cases when LF character is missing for example)
* @param multisegmentParams Parameters for reading multisegment mainframe files
* @param improvedNullDetection If true, string values that contain only zero bytes (0x0) will be considered null.
* @param commentPolicy A comment truncation policy
Expand Down Expand Up @@ -72,6 +73,7 @@ case class CobolParameters(
variableLengthParams: Option[VariableLengthParameters],
schemaRetentionPolicy: SchemaRetentionPolicy,
stringTrimmingPolicy: StringTrimmingPolicy,
allowPartialRecords: Boolean,
multisegmentParams: Option[MultisegmentParameters],
commentPolicy: CommentPolicy,
improvedNullDetection: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param generateRecordId If true, a record id field will be prepended to each record.
* @param schemaPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
* @param stringTrimmingPolicy Specifies if and how strings should be trimmed when parsed.
* @param allowPartialRecords If true, partial ASCII records can be parsed (in cases when LF character is missing for example)
* @param multisegment Parameters specific to reading multisegment files
* @param commentPolicy A comment truncation policy
* @param improvedNullDetection If true, string values that contain only zero bytes (0x0) will be considered null.
Expand Down Expand Up @@ -93,6 +94,7 @@ case class ReaderParameters(
generateRecordId: Boolean = false,
schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal,
stringTrimmingPolicy: StringTrimmingPolicy = StringTrimmingPolicy.TrimBoth,
allowPartialRecords: Boolean = false,
multisegment: Option[MultisegmentParameters] = None,
commentPolicy: CommentPolicy = CommentPolicy(),
improvedNullDetection: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.encoding.ASCII
import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParserFactory
import za.co.absa.cobrix.cobol.reader.memorystream.TestStringStream
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, TextRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, TextFullRecordExtractor, TextRecordExtractor}
import za.co.absa.cobrix.cobol.reader.index.IndexGenerator


Expand All @@ -41,22 +41,22 @@ class SparseIndexSpecSpec extends WordSpec {
"""

"sparseIndexGenerator()" should {
"Generate a sparse index for ASCII text data" in {
val copybook = CopybookParser.parse(copybookContents, ASCII)

val textFileContent: String =
Seq("1Tes 0123456789",
"2Test 012345",
"1None Data¡3 ",
"2 on Data 4",
"1Tes 0123456789",
"2Test 012345",
"1None Data¡3 ",
"2 on Data 4").mkString("\n")

val segmentIdField = copybook.getFieldByName("T").asInstanceOf[Primitive]
val segmentIdRootValue = "1"
val copybook = CopybookParser.parse(copybookContents, ASCII)

val textFileContent: String =
Seq("1Tes 0123456789",
"2Test 012345",
"1None Data¡3 ",
"2 on Data 4",
"1Tes 0123456789",
"2Test 012345",
"1None Data¡3 ",
"2 on Data 4").mkString("\n")

val segmentIdField = copybook.getFieldByName("T").asInstanceOf[Primitive]
val segmentIdRootValue = "1"

"Generate a sparse index for ASCII text data with partial records allowed" in {
val stream = new TestStringStream(textFileContent)

val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0)
Expand All @@ -76,6 +76,27 @@ class SparseIndexSpecSpec extends WordSpec {
assert(indexes(3).offsetFrom == 90)
assert(indexes(3).offsetTo == -1)
}

"Generate a sparse index for ASCII text data with partial records not allowed" in {
val stream = new TestStringStream(textFileContent)

val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0)

val recordExtractor = new TextFullRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, ""))

val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false,
recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue)
assert(indexes.length == 4)
assert(indexes.head.offsetFrom == 0)
assert(indexes.head.offsetTo == 30)
assert(indexes(1).offsetFrom == 30)
assert(indexes(1).offsetTo == 60)
assert(indexes(2).offsetFrom == 60)
assert(indexes(2).offsetTo == 90)
assert(indexes(3).offsetFrom == 90)
assert(indexes(3).offsetTo == -1)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ object CobolParametersParser extends Logging {
val PARAM_FLOATING_POINT_FORMAT = "floating_point_format"
val PARAM_VARIABLE_SIZE_OCCURS = "variable_size_occurs"
val PARAM_IMPROVED_NULL_DETECTION = "improved_null_detection"
val PARAM_ALLOW_PARTIAL_RECORDS = "allow_partial_records"

// Parameters for multisegment variable length files
val PARAM_RECORD_FORMAT = "record_format"
Expand Down Expand Up @@ -248,6 +249,7 @@ object CobolParametersParser extends Logging {
parseVariableLengthParameters(params, recordFormat),
schemaRetentionPolicy,
stringTrimmingPolicy,
params.getOrElse(PARAM_ALLOW_PARTIAL_RECORDS, "false").toBoolean,
parseMultisegmentParameters(params),
parseCommentTruncationPolicy(params),
params.getOrElse(PARAM_IMPROVED_NULL_DETECTION, "false").toBoolean,
Expand All @@ -257,6 +259,7 @@ object CobolParametersParser extends Logging {
getOccursMappings(params.getOrElse(PARAM_OCCURS_MAPPINGS, "{}")),
getDebuggingFieldsPolicy(params),
params.getOrElse(PARAM_DEBUG_IGNORE_FILE_SIZE, "false").toBoolean

)
validateSparkCobolOptions(params, recordFormat)
cobolParameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class DefaultSource
generateRecordId = varLenParams.generateRecordId,
schemaPolicy = parameters.schemaRetentionPolicy,
stringTrimmingPolicy = parameters.stringTrimmingPolicy,
allowPartialRecords = parameters.allowPartialRecords,
parameters.multisegmentParams,
parameters.commentPolicy,
parameters.improvedNullDetection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class Test13AsciiCrLfText extends WordSpec with SparkTestBase with BinaryFileFix

val expected = """[{"A":"AA"},{"A":"BB"},{"A":"CC"}]"""

df.show
val count = df.count()
val actual = df.toJSON.collect().mkString("[", ",", "]")

Expand Down
Loading

0 comments on commit f8f7c8b

Please sign in to comment.