Skip to content

Commit

Permalink
#607 Add support for minimum and maximum record length for custom rec…
Browse files Browse the repository at this point in the history
…ord extractors.
  • Loading branch information
yruslan committed Apr 18, 2023
1 parent d85005f commit 843aaa0
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class VRLRecordReader(cobolSchema: Copybook,
case None =>
cachedValue = None
recordFetched = true
case Some(data) if data.length < minimumRecordLength || data.length > maximumRecordLength =>
recordFetched = false
case Some(data) =>
val segmentId = getSegmentId(data)
val segmentIdStr = segmentId.getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ class ParametersParsingSpec extends AnyFunSuite {
)
val params = new Parameters(myMap)

val fieldCodaPageMap = CobolParametersParser.getFieldCodepageMap(params)
val fieldCodePageMap = CobolParametersParser.getFieldCodepageMap(params)

assert(fieldCodaPageMap.size == 3)
assert(fieldCodaPageMap("field1") == "cp1256")
assert(fieldCodaPageMap("field_2") == "us-ascii")
assert(fieldCodaPageMap("field_3") == "us-ascii")
assert(fieldCodePageMap.size == 3)
assert(fieldCodePageMap("field1") == "cp1256")
assert(fieldCodePageMap("field_2") == "us-ascii")
assert(fieldCodePageMap("field_3") == "us-ascii")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,24 @@ class Test11CustomRDWParser extends AnyFunSuite with SparkTestBase {
assert(Test10CustomRDWParser.additionalInfo == "rhp info")
}

test(s"Integration test on $exampleName with minimum record length") {
Test10CustomRDWParser.additionalInfo = ""

val df = spark
.read
.format("cobol")
.option("copybook", inputCopybookPath)
.option("record_format", "V")
.option("generate_record_id", "true")
.option("schema_retention_policy", "collapse_root")
.option("record_header_parser", "za.co.absa.cobrix.spark.cobol.source.utils.Test10CustomRDWParser")
.option("rhp_additional_info", "rhp info")
.option("improved_null_detection", "false")
.option("minimum_record_length", 61)
.load(inputDataPath)
.drop("Record_Byte_Length")

assert(df.count() == 328)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,45 @@ class Test26CustomRecordExtractor extends AnyWordSpec with SparkTestBase with Bi

"Custom record extractor" should {
"apply the extractor to a binary data" in {
val expected = """[{"A":"AA"},{"A":"BBB"},{"A":"CC"},{"A":"DDD"},{"A":"EE"},{"A":"FFF"}]"""
val expected = """[{"A":"AA"},{"A":"BBB"},{"A":"CC"},{"A":"DDD"},{"A":"EE"},{"A":"FFF"}]"""

withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName =>
val df = getDataFrame(tmpFileName)
withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName =>
val df = getDataFrame(tmpFileName)

val actual = df.toJSON.collect().mkString("[", ",", "]")
val actual = df.toJSON.collect().mkString("[", ",", "]")

assert(actual == expected)
assert(CustomRecordExtractorMock.additionalInfo == "re info")
}
assert(actual == expected)
assert(CustomRecordExtractorMock.additionalInfo == "re info")
}
}

"filter out records that are bigger than the specified size" in {
val expected = """[{"A":"BBB"},{"A":"DDD"},{"A":"FFF"}]"""

withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName =>
val df = getDataFrame(tmpFileName, Map("minimum_record_length" -> "3"))

val actual = df.toJSON.collect().mkString("[", ",", "]")

assert(actual == expected)
assert(CustomRecordExtractorMock.additionalInfo == "re info")
}
}

"filter out records that are smaller than the specified size" in {
val expected = """[{"A":"AA"},{"A":"CC"},{"A":"EE"}]"""

withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName =>
val df = getDataFrame(tmpFileName, Map("maximum_record_length" -> "2"))

val actual = df.toJSON.collect().mkString("[", ",", "]")

assert(actual == expected)
assert(CustomRecordExtractorMock.additionalInfo == "re info")
}
}
}

"Custom record extractor options are not compatible with" when {
"record_length" in {
intercept[IllegalArgumentException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,32 @@ class Test01AsciiTextFiles extends AnyFunSuite with SparkTestBase with BinaryFil

assertEqualsMultiline(actual, expected)
}

test("Throw an exception if ASCII charset is specified for s EBCDIC file") {
val ex = intercept[IllegalArgumentException] {
spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("encoding", "ebcdic")
.option("ascii_charset", "us-ascii")
.load("dummy")
}

assert(ex.getMessage.contains("Option 'ascii_charset' cannot be used when 'encoding = ebcdic'"))
}

test("Throw an exception if EBCDIC code page is specified for an ASCII file") {
val ex = intercept[IllegalArgumentException] {
spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("encoding", "ascii")
.option("ebcdic_code_page", "cp037")
.load("dummy")
}

assert(ex.getMessage.contains("Option 'ebcdic_code_page' cannot be used when 'encoding = ascii'"))
}
}

0 comments on commit 843aaa0

Please sign in to comment.