Skip to content

Commit

Permalink
#710 Move record length parser out of the reader since the record ext…
Browse files Browse the repository at this point in the history
…ractor is used instead.
  • Loading branch information
yruslan committed Sep 26, 2024
1 parent 084fb7f commit 5f3ba6f
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import za.co.absa.cobrix.cobol.internal.Logging
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, FixedLength, VariableBlock}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, FixedLength, VariableBlock, VariableLength}
import za.co.absa.cobrix.cobol.reader.extractors.raw._
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
import za.co.absa.cobrix.cobol.reader.index.IndexGenerator
Expand Down Expand Up @@ -77,6 +77,8 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
Some(new TextFullRecordExtractor(reParams))
case None if readerProperties.recordFormat == FixedLength && (readerProperties.lengthFieldExpression.nonEmpty || readerProperties.lengthFieldMap.nonEmpty) =>
Some(new FixedWithRecordLengthExprRawRecordExtractor(reParams, readerProperties))
case None if readerProperties.recordFormat == VariableLength && (readerProperties.lengthFieldExpression.nonEmpty || readerProperties.lengthFieldMap.nonEmpty) =>
Some(new FixedWithRecordLengthExprRawRecordExtractor(reParams, readerProperties))
case None if readerProperties.recordFormat == FixedBlock =>
val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock)
FixedBlockParameters.validate(fbParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,7 @@ class VRLRecordReader(cobolSchema: Copybook,
private var byteIndex = startingFileOffset
private var recordIndex = startRecordId - 1

final private val copyBookRecordSize = cobolSchema.getRecordSize
final private val (recordLengthField, lengthFieldExpr) = ReaderParametersValidator.getEitherFieldAndExpression(readerProperties.lengthFieldExpression, readerProperties.lengthFieldMap, cobolSchema)
final private val lengthField = recordLengthField.map(_.field)
final private val lengthMap = recordLengthField.map(_.valueMap).getOrElse(Map.empty)
final private val isLengthMapEmpty = lengthMap.isEmpty
final private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, cobolSchema)
final private val recordLengthAdjustment = readerProperties.rdwAdjustment
final private val useRdw = lengthField.isEmpty && lengthFieldExpr.isEmpty
final private val minimumRecordLength = readerProperties.minimumRecordLength
final private val maximumRecordLength = readerProperties.maximumRecordLength

Expand Down Expand Up @@ -90,13 +83,7 @@ class VRLRecordReader(cobolSchema: Copybook,
None
}
case None =>
if (useRdw) {
fetchRecordUsingRdwHeaders()
} else if (lengthField.nonEmpty) {
fetchRecordUsingRecordLengthField()
} else {
fetchRecordUsingRecordLengthFieldExpression(lengthFieldExpr.get)
}
fetchRecordUsingRdwHeaders()
}

binaryData match {
Expand All @@ -117,110 +104,6 @@ class VRLRecordReader(cobolSchema: Copybook,

def getRecordIndex: Long = recordIndex

private def fetchRecordUsingRecordLengthField(): Option[Array[Byte]] = {
if (lengthField.isEmpty) {
throw new IllegalStateException(s"For variable length reader either RDW record headers or record length field should be provided.")
}

val lengthFieldBlock = lengthField.get.binaryProperties.offset + lengthField.get.binaryProperties.actualSize

val binaryDataStart = dataStream.next(readerProperties.startOffset + lengthFieldBlock)

byteIndex += readerProperties.startOffset + lengthFieldBlock

if (binaryDataStart.length < readerProperties.startOffset + lengthFieldBlock) {
return None
}

val recordLength = lengthField match {
case Some(lengthAST) => getRecordLengthFromField(lengthAST, binaryDataStart)
case None => copyBookRecordSize
}

val restOfDataLength = recordLength - lengthFieldBlock + readerProperties.endOffset

byteIndex += restOfDataLength

if (restOfDataLength > 0) {
Some(binaryDataStart ++ dataStream.next(restOfDataLength))
} else {
Some(binaryDataStart)
}
}

final private def getRecordLengthFromField(lengthAST: Primitive, binaryDataStart: Array[Byte]): Int = {
val length = if (isLengthMapEmpty) {
cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
case i: Int => i
case l: Long => l.toInt
case s: String => s.toInt
case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).")
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
}
} else {
cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
case i: Int => getRecordLengthFromMapping(i.toString)
case l: Long => getRecordLengthFromMapping(l.toString)
case s: String => getRecordLengthFromMapping(s)
case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).")
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
}
}
length + recordLengthAdjustment
}

final private def getRecordLengthFromMapping(v: String): Int = {
lengthMap.get(v) match {
case Some(len) => len
case None => throw new IllegalStateException(s"Record length value '$v' is not mapped to a record length.")
}
}

final private def getBytesAsHexString(bytes: Array[Byte]): String = {
bytes.map("%02X" format _).mkString
}

private def fetchRecordUsingRecordLengthFieldExpression(expr: RecordLengthExpression): Option[Array[Byte]] = {
val lengthFieldBlock = expr.requiredBytesToread
val evaluator = expr.evaluator

val binaryDataStart = dataStream.next(readerProperties.startOffset + lengthFieldBlock)

byteIndex += readerProperties.startOffset + lengthFieldBlock

if (binaryDataStart.length < readerProperties.startOffset + lengthFieldBlock) {
return None
}

expr.fields.foreach{
case (name, field) =>
val obj = cobolSchema.extractPrimitiveField(field, binaryDataStart, readerProperties.startOffset)
try {
obj match {
case i: Int => evaluator.setValue(name, i)
case l: Long => evaluator.setValue(name, l.toInt)
case s: String => evaluator.setValue(name, s.toInt)
case _ => throw new IllegalStateException(s"Record length value of the field ${field.name} must be an integral type.")
}
} catch {
case ex: NumberFormatException =>
throw new IllegalStateException(s"Encountered an invalid value of the record length field. Cannot parse '$obj' as an integer in: ${field.name} = '$obj'.", ex)
}
}

val recordLength = evaluator.eval()

val restOfDataLength = recordLength - lengthFieldBlock + readerProperties.endOffset

byteIndex += restOfDataLength

if (restOfDataLength > 0) {
Some(binaryDataStart ++ dataStream.next(restOfDataLength))
} else {
Some(binaryDataStart)
}
}

private def fetchRecordUsingRdwHeaders(): Option[Array[Byte]] = {
val rdwHeaderBlock = recordHeaderParser.getHeaderLength

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.cobrix.cobol.mock.{ByteStreamMock, RecordExtractorMock, RecordHeadersParserMock}
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserRDW}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedWithRecordLengthExprRawRecordExtractor, RawRecordContext, RawRecordExtractor}
import za.co.absa.cobrix.cobol.reader.parameters.{MultisegmentParameters, ReaderParameters}

class VRLRecordReaderSpec extends AnyWordSpec {
Expand Down Expand Up @@ -127,10 +127,17 @@ class VRLRecordReaderSpec extends AnyWordSpec {
0x00, 0x07, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8
).map(_.toByte)

val streamH = new ByteStreamMock(records)
val streamD = new ByteStreamMock(records)
val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "")

val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN"))

val reader = getUseCase(
copybook = copybookWithFieldLength,
records = records,
lengthFieldExpression = Some("LEN"))
lengthFieldExpression = Some("LEN"),
recordExtractor = Some(new FixedWithRecordLengthExprRawRecordExtractor(context, readerParameters)))

assert(reader.hasNext)
val (segment1, record1) = reader.next()
Expand Down Expand Up @@ -163,10 +170,17 @@ class VRLRecordReaderSpec extends AnyWordSpec {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xF1, 0xF5, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8
).map(_.toByte)

val streamH = new ByteStreamMock(records)
val streamD = new ByteStreamMock(records)
val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "")

val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN"))

val reader = getUseCase(
copybook = copybookWithFieldLength,
records = records,
lengthFieldExpression = Some("LEN"))
lengthFieldExpression = Some("LEN"),
recordExtractor = Some(new FixedWithRecordLengthExprRawRecordExtractor(context, readerParameters)))

assert(reader.hasNext)
val (segment1, record1) = reader.next()
Expand Down Expand Up @@ -195,12 +209,18 @@ class VRLRecordReaderSpec extends AnyWordSpec {
"""

val records = Array[Byte](0x00)
val streamH = new ByteStreamMock(records)
val streamD = new ByteStreamMock(records)
val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "")

val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN"))

val ex = intercept[IllegalStateException] {
getUseCase(
copybook = copybookWithFieldLength,
records = records,
lengthFieldExpression = Some("LEN"))
lengthFieldExpression = Some("LEN"),
recordExtractor = Some(new FixedWithRecordLengthExprRawRecordExtractor(context, readerParameters)))
}

assert(ex.getMessage == "The record length field LEN must be an integral type or a value mapping must be specified.")
Expand All @@ -220,10 +240,17 @@ class VRLRecordReaderSpec extends AnyWordSpec {
0x00, 0x08, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8
).map(_.toByte)

val streamH = new ByteStreamMock(records)
val streamD = new ByteStreamMock(records)
val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "")

val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN - 1"))

val reader = getUseCase(
copybook = copybookWithFieldLength,
records = records,
lengthFieldExpression = Some("LEN - 1"))
lengthFieldExpression = Some("LEN - 1"),
recordExtractor = Some(new FixedWithRecordLengthExprRawRecordExtractor(context, readerParameters)))

assert(reader.hasNext)
val (segment1, record1) = reader.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,26 @@ class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with
}
}

"work for simple mappings with format=V" in {
withTempBinFile("record_length_mapping", ".tmp", dataSimple) { tempFile =>
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""

val df = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "V")
.option("record_length_field", "SEG-ID")
.option("input_split_records", "2")
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)

val actual = df.orderBy("SEG_ID").toJSON.collect().mkString(",")

assert(actual == expected)
}
}

"work for numeric mappings" in {
withTempBinFile("record_length_mapping", ".tmp", dataNumeric) { tempFile =>
val expected = """{"SEG_ID":"1","TEXT":"123"},{"SEG_ID":"2","TEXT":"123456"},{"SEG_ID":"3","TEXT":"1234567"}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Test26FixLengthWithIdGeneration extends AnyWordSpec with SparkTestBase wit
// A4
0xC1.toByte, 0xF3.toByte, 0xF4.toByte,
// A5
0xC1.toByte, 0xF3.toByte, 0xF5.toByte,
0xC1.toByte, 0xF3.toByte, 0xF5.toByte
)

val binFileContentsLengthExpr: Array[Byte] = Array[Byte](
Expand All @@ -77,7 +77,7 @@ class Test26FixLengthWithIdGeneration extends AnyWordSpec with SparkTestBase wit
// A4
0xC1.toByte, 0xF2.toByte, 0xF4.toByte,
// A5
0xC1.toByte, 0xF2.toByte, 0xF5.toByte,
0xC1.toByte, 0xF2.toByte, 0xF5.toByte
)

val expected: String =
Expand Down

0 comments on commit 5f3ba6f

Please sign in to comment.