Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added several new features in this PR #408

Merged
merged 10 commits into from
Aug 2, 2021
48 changes: 27 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ You can collect the uber jar of `spark-cobol` either at

Then, run `spark-shell` or `spark-submit` adding the fat jar as the option.
```sh
$ spark-shell --jars spark-cobol-assembly-2.2.4-SNAPSHOT.jar
$ spark-shell --jars spark-cobol-assembly-2.3.0-SNAPSHOT.jar
```

## Other Features
Expand Down Expand Up @@ -326,11 +326,15 @@ The space used by the headers should not be mentioned in the copybook if this op
### Schema collapsing

Mainframe data often contain only one root GROUP. In such cases such a GROUP can be considered something similar to XML rowtag.
Cobrix allows to collapse the GROUP and expand it's records. To turn this on use the following option:
Cobrix allows either to collapse or to retain the GROUP. To turn this on use the following option:

```
```scala
.option("schema_retention_policy", "collapse_root")
```
or
```scala
.option("schema_retention_policy", "keep_original")
```

Let's look at an example. Let's say we have a copybook that looks like this:
```cobol
Expand All @@ -341,7 +345,17 @@ Let's look at an example. Let's say we have a copybook that looks like this:
10 COMPANY-ID-NUM PIC 9(5) COMP-3.
```

Normally Spark schema for such a copybook will look like this:
When "schema_retention_policy" is set to "collapse_root" (default), the root group will be collapsed and the schema will look
like this (note the RECORD field is not part of the schema):
```
root
|-- ID: integer (nullable = true)
|-- COMPANY: struct (nullable = true)
| |-- SHORT_NAME: string (nullable = true)
| |-- COMPANY_ID_NUM: integer (nullable = true)
```

But when "schema_retention_policy" is set to "keep_original", the schema will look like this (note the RECORD field is part of the schema):

```
root
Expand All @@ -352,16 +366,6 @@ root
| | |-- COMPANY_ID_NUM: integer (nullable = true)
```

But when "schema_retention_policy" is set to "collapse_root" the root group will be collapsed and the schema will look
like this (note the RECORD field is no longer present):
```
root
|-- ID: integer (nullable = true)
|-- COMPANY: struct (nullable = true)
| |-- SHORT_NAME: string (nullable = true)
| |-- COMPANY_ID_NUM: integer (nullable = true)
```

You can experiment with this feature using built-in example in `za.co.absa.cobrix.spark.cobol.examples.CobolSparkExample`


Expand Down Expand Up @@ -541,7 +545,6 @@ Working example 1:
.option("copybook_contents", copybook)
.option("is_text", "true")
.option("encoding", "ascii")
.option("schema_retention_policy", "collapse_root")
.load(tmpFileName)
````

Expand Down Expand Up @@ -631,7 +634,6 @@ val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("schema_retention_policy", "collapse_root")
.option("is_record_sequence", "true")
.option("segment_field", "SEGMENT_ID")
.option("segment_id_level0", "C")
Expand Down Expand Up @@ -740,7 +742,6 @@ val df = spark
.read
.format("cobol")
.option("copybook", "/path/to/thecopybook")
.option("schema_retention_policy", "collapse_root") // Collapses the root group returning it's field on the top level of the schema
.option("is_record_sequence", "true")
.load("examples/multisegment_data")
```
Expand Down Expand Up @@ -806,7 +807,6 @@ val df = spark
.read
.format("cobol")
.option("copybook", "/path/to/thecopybook")
.option("schema_retention_policy", "collapse_root")
.option("is_record_sequence", "true")

// Specifies a field containing a segment id
Expand Down Expand Up @@ -960,7 +960,6 @@ val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("schema_retention_policy", "collapse_root")
.option("is_record_sequence", "true")
.option("segment_field", "SEGMENT_ID")
.option("segment_id_level0", "C")
Expand Down Expand Up @@ -1119,6 +1118,7 @@ Again, the full example is available at

| Option (usage example) | Description |
| ------------------------------------------ |:----------------------------------------------------------------------------- |
| .option("schema_retention_policy", "collapse_root") | When `collapse_root` (default) the root level record will be removed from the Spark schema. When `keep_original`, the root level GROUP will be present in the Spark schema |
| .option("drop_group_fillers", "false") | If `true`, all GROUP FILLERs will be dropped from the output schema. If `false` (default), such fields will be retained. |
| .option("drop_value_fillers", "false") | If `true` (default), all non-GROUP FILLERs will be dropped from the output schema. If `false`, such fields will be retained. |
| .option("non_terminals", "GROUP1,GROUP2") | Specifies groups to also be added to the schema as string fields. When this option is specified, the reader will add one extra data field after each matching group containing the string data for the group. |
Expand Down Expand Up @@ -1196,7 +1196,6 @@ For fixed record length tests:
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("schema_retention_policy", "collapse_root")
.load(args(0))

df.write.mode(SaveMode.Overwrite).parquet(args(1))
Expand All @@ -1214,7 +1213,6 @@ For multisegment variable lengths tests:
.format("cobol")
.option("copybook_contents", copybook)
.option("generate_record_id", true)
.option("schema_retention_policy", "collapse_root")
.option("is_record_sequence", "true")
.option("segment_field", "SEGMENT_ID")
.option("segment_id_level0", "C")
Expand Down Expand Up @@ -1261,6 +1259,14 @@ For multisegment variable lengths tests:
![](performance/images/exp3_multiseg_wide_records_throughput.svg) ![](performance/images/exp3_multiseg_wide_mb_throughput.svg)

## Changelog
- #### 2.3.0 to be released soon.
- [#405](https://github.com/AbsaOSS/cobrix/issues/405) Fix extracting records that contain redefines of the top level GROUPs.
- [#406](https://github.com/AbsaOSS/cobrix/issues/406) Use 'collapse_root' retention policy by default. This is the breaking,
change, to restore the original behavior add `.option("schema_retention_policy", "keep_original")`.
- [#407](https://github.com/AbsaOSS/cobrix/issues/407) The layout positions summary generated by the parser now contains level
numbers for root level GROUPs. This is a breaking change if you have unit tests that depend on the formatting of the layout
positions output.

- #### 2.2.3 released 14 July 2021.
- [#397](https://github.com/AbsaOSS/cobrix/issues/397) Fix skipping of empty lines when reading ASCII files with `is_record_sequence = true`
- [#394](https://github.com/AbsaOSS/cobrix/issues/394) Added an ability to specify multiple paths to read data from (Use `.option("paths", inputPaths.mkString(","))`).
Expand Down
2 changes: 1 addition & 1 deletion cobol-converters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.cobrix</groupId>
<artifactId>cobrix_2.11</artifactId>
<version>2.2.4-SNAPSHOT</version>
<version>2.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cobol-parser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.cobrix</groupId>
<artifactId>cobrix_2.11</artifactId>
<version>2.2.4-SNAPSHOT</version>
<version>2.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class Copybook(val ast: CopybookAST) extends Serializable {
}
}

def generateGroupLayoutPositions(group: Group, path: String = " "): String = {
def generateGroupLayoutPositions(group: Group, path: String = ""): String = {
val fieldStrings = for (field <- group.children) yield {
fieldCounter += 1
val isRedefines = if (field.redefines.nonEmpty) "R" else ""
Expand All @@ -220,28 +220,28 @@ class Copybook(val ast: CopybookAST) extends Serializable {

field match {
case grp: Group =>
val fieldCounterPart = alignRight(s"$fieldCounter", 5)
val modifiers = s"$isRedefinedByStr$isRedefines$isArray"
val groupStr = generateGroupLayoutPositions(grp, path + " ")
val start = grp.binaryProperties.offset + 1
val length = grp.binaryProperties.actualSize
val end = start + length - 1
val namePart = alignLeft(s"$path${grp.level} ${grp.name}", 39)
val picturePart = alignLeft(modifiers, 11)
val fieldCounterPart = alignRight(s"$fieldCounter", 5)
val startPart = alignRight(s"$start", 7)
val fieldEndPart = alignRight(s"$end", 7)
val fieldLengthPart = alignRight(s"$length", 7)
val groupDescription = s"$namePart$picturePart$fieldCounterPart$startPart$fieldEndPart$fieldLengthPart\n"
groupDescription + groupStr
case s: Primitive =>
val fieldCounterPart = alignRight(s"$fieldCounter", 5)
val isDependeeStr = if (s.isDependee) "D" else ""
val modifiers = s"$isDependeeStr$isRedefinedByStr$isRedefines$isArray"
val start = s.binaryProperties.offset + 1
val length = s.binaryProperties.actualSize
val end = start + length - 1
val namePart = alignLeft(s"$path${s.level} ${s.name}", 39)
val picturePart = alignLeft(modifiers, 11)
val fieldCounterPart = alignRight(s"$fieldCounter", 5)
val startPart = alignRight(s"$start", 7)
val fieldEndPart = alignRight(s"$end", 7)
val fieldLengthPart = alignRight(s"$length", 7)
Expand All @@ -251,19 +251,9 @@ class Copybook(val ast: CopybookAST) extends Serializable {
fieldStrings.mkString("\n")
}

val strings = for (grp <- ast.children) yield {
val start = grp.binaryProperties.offset + 1
val length = grp.binaryProperties.actualSize
val end = start + length - 1
val groupStr = generateGroupLayoutPositions(grp.asInstanceOf[Group])
val namePart = alignLeft(s"${grp.name}", 55)
val fieldStartPart = alignRight(s"$start", 7)
val fieldEndPart = alignRight(s"$end", 7)
val fieldLengthPart = alignRight(s"$length", 7)
s"$namePart$fieldStartPart$fieldEndPart$fieldLengthPart\n$groupStr"
}
val layout = generateGroupLayoutPositions(ast)
val header = "-------- FIELD LEVEL/NAME --------- --ATTRIBS-- FLD START END LENGTH\n\n"
header + strings.mkString("\n")
header + layout
}

def dropRoot(): Copybook = {
Expand Down Expand Up @@ -313,10 +303,14 @@ class Copybook(val ast: CopybookAST) extends Serializable {


object Copybook {
def merge(copybooks: Iterable[Copybook]): Copybook = {
def merge(copybooks: Seq[Copybook]): Copybook = {
if (copybooks.isEmpty)
throw new RuntimeException("Cannot merge an empty iterable of copybooks.")

if (copybooks.size == 1) {
return copybooks.head
}

// make sure all segments are the same level
val rootLevels: Set[Int] = copybooks.flatMap(cb => cb.ast.children.map({
case x: Group => x.level
Expand Down Expand Up @@ -354,6 +348,7 @@ object Copybook {
case x: Group => x.copy(redefines = None, isRedefined = true)(Some(newRoot))
case x: Primitive => x.copy(redefines = None, isRedefined = true)(Some(newRoot))
})

newRoot.children ++= copybooks.head.ast.children.tail.map({
case x: Group => x.copy(redefines = Option(targetName), isRedefined = false)(Some(newRoot))
case x: Primitive => x.copy(redefines = Option(targetName), isRedefined = false)(Some(newRoot))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ object RecordExtractors {

val records = for (record <- ast.children) yield {
val (size, values) = getGroupValues(nextOffset, record.asInstanceOf[Group])
nextOffset += size
if (!record.isRedefined) {
nextOffset += size
}
values
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package za.co.absa.cobrix.cobol.parser.copybooks

import org.scalatest.FunSuite
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.cobrix.cobol.parser.ast.Group
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.testutils.SimpleComparisonBase


class CopybooksOperationsSpec extends FunSuite {
class CopybooksOperationsSpec extends FunSuite with SimpleComparisonBase {
private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)

test("Test drop root from copybook") {
val copyBookContents1: String =
Expand All @@ -39,26 +41,26 @@ class CopybooksOperationsSpec extends FunSuite {
assert(copybook1.getRecordSize == 30)
assert(copybookDR1.getRecordSize == 30)

assert(copybook1.generateRecordLayoutPositions ==
assertEqualsMultiline(copybook1.generateRecordLayoutPositions,
"""-------- FIELD LEVEL/NAME --------- --ATTRIBS-- FLD START END LENGTH
|
|RECORD_COPYBOOK_1 1 30 30
| 5 GROUP_1 6 1 30 30
| 6 FIELD_1 2 1 10 10
| 6 FILLER 3 11 15 5
| 6 GROUP_2 6 16 30 15
| 10 NESTED_FIELD_1 5 16 25 10
| 10 FILLER 6 26 30 5"""
|1 RECORD_COPYBOOK_1 1 1 30 30
| 5 GROUP_1 2 1 30 30
| 6 FIELD_1 3 1 10 10
| 6 FILLER 4 11 15 5
| 6 GROUP_2 5 16 30 15
| 10 NESTED_FIELD_1 6 16 25 10
| 10 FILLER 7 26 30 5"""
.stripMargin.replace("\r\n", "\n"))
assert(copybookDR1.generateRecordLayoutPositions ==
assertEqualsMultiline(copybookDR1.generateRecordLayoutPositions,
"""-------- FIELD LEVEL/NAME --------- --ATTRIBS-- FLD START END LENGTH
|
|GROUP_1 1 30 30
| 6 FIELD_1 1 1 10 10
| 6 FILLER 2 11 15 5
| 6 GROUP_2 5 16 30 15
| 10 NESTED_FIELD_1 4 16 25 10
| 10 FILLER 5 26 30 5"""
|5 GROUP_1 1 1 30 30
| 6 FIELD_1 2 1 10 10
| 6 FILLER 3 11 15 5
| 6 GROUP_2 4 16 30 15
| 10 NESTED_FIELD_1 5 16 25 10
| 10 FILLER 6 26 30 5"""
.stripMargin.replace("\r\n", "\n"))

val exception = intercept[RuntimeException] {
Expand Down Expand Up @@ -92,7 +94,7 @@ class CopybooksOperationsSpec extends FunSuite {
|""".stripMargin.replace("\r\n", "\n")
val copybook1 = CopybookParser.parseTree(copyBookContents1)
val exception2 = intercept[RuntimeException] {
copybook1.dropRoot()
copybook1.dropRoot()
}
assert(exception2.getMessage.contains("Cannot drop the root of a copybook with more than one root segment"))
}
Expand Down Expand Up @@ -123,33 +125,31 @@ class CopybooksOperationsSpec extends FunSuite {
assert(copybookR1.getRecordSize == 30)
assert(copybookR2.getRecordSize == 60)

assert(copybookR1.generateRecordLayoutPositions ==
assertEqualsMultiline(copybookR1.generateRecordLayoutPositions(),
"""-------- FIELD LEVEL/NAME --------- --ATTRIBS-- FLD START END LENGTH
|
|GROUP_1A 1 30 30
| 6 FIELD_1 1 1 10 10
| 6 FILLER 2 11 15 5
| 6 GROUP_2A 5 16 30 15
| 10 NESTED_FIELD_1 4 16 25 10
| 10 FILLER 5 26 30 5"""
|5 GROUP_1A 1 1 30 30
| 6 FIELD_1 2 1 10 10
| 6 FILLER 3 11 15 5
| 6 GROUP_2A 4 16 30 15
| 10 NESTED_FIELD_1 5 16 25 10
| 10 FILLER 6 26 30 5"""
.stripMargin.replace("\r\n", "\n"))
assert(copybookR2.generateRecordLayoutPositions ==
assertEqualsMultiline(copybookR2.generateRecordLayoutPositions(),
"""-------- FIELD LEVEL/NAME --------- --ATTRIBS-- FLD START END LENGTH
|
|GROUP_1B 1 60 60
| 6 FIELD_1 1 1 20 20
| 6 FILLER 2 21 30 10
| 6 GROUP_2B 5 31 60 30
| 10 NESTED_FIELD_1 4 31 50 20
| 10 FILLER 5 51 60 10"""
|5 GROUP_1B 1 1 60 60
| 6 FIELD_1 2 1 20 20
| 6 FILLER 3 21 30 10
| 6 GROUP_2B 4 31 60 30
| 10 NESTED_FIELD_1 5 31 50 20
| 10 FILLER 6 51 60 10"""
.stripMargin.replace("\r\n", "\n"))

val exception1 = intercept[RuntimeException] {
copybook1.restrictTo("GROUP-1A.FIELD-1")
}
assert(exception1.getMessage.contains("Can only restrict the copybook to a group element"))


}

}
Loading