Skip to content

Commit

Permalink
#672 Add tests for various scenarios.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Apr 22, 2024
1 parent 1f9bfb5 commit 6f78ed2
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 10 deletions.
82 changes: 80 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Among the motivations for this project, it is possible to highlight:

- Supports REDEFINES, OCCURS and DEPENDING ON fields (e.g. unchecked unions and variable-size arrays)

- Supports nested structures and arrays (including "flattened" nested names)
- Supports nested structures and arrays

- Supports HDFS as well as local file systems

Expand Down Expand Up @@ -350,8 +350,18 @@ Currently, specifying multiple paths in `load()` is not supported. Use the follo
### Spark SQL schema extraction
This library also provides convenient methods to extract Spark SQL schemas and Cobol layouts from copybooks.

If you want to extract a Spark SQL schema from a copybook:
If you want to extract a Spark SQL schema from a copybook by providing same options you provide to Spark:
```scala
// Same options that you use for spark.read.format("cobol").option()
val options = Map("schema_retention_policy" -> "keep_original")

val cobolSchema = CobolSchema.fromSparkOptions(Seq(copybook), options)
val sparkSchema = cobolSchema.getSparkSchema.toString()

println(sparkSchema)
```

If you want to extract a Spark SQL schema from a copybook using the Cobol parser directly:
```scala
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
Expand Down Expand Up @@ -1397,6 +1407,74 @@ When using `9` 8 refers to the number of digits the number has. Here, the size o
```
You can have decimals when using COMP-3 as well.

### Flattening schema with GROUPs and OCCURS
Flattening could be helpful when migrating data from mainframe data with fields that have OCCURs (arrays) to a relational
databases that do not support nested arrays.

Cobrix has a method that can flatten the schema automatically given a DataFrame produced by `spark-cobol`.

Spark Scala example:
```scala
val dfFlat = SparkUtils.flattenSchema(df, useShortFieldNames = false)
```

PySpark example
```python
from pyspark.sql import SparkSession, DataFrame, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from py4j.java_gateway import java_import

schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("subjects", ArrayType(StringType()), True)
])

# Sample data
data = [
(1, "Alice", ["Math", "Science"]),
(2, "Bob", ["History", "Geography"]),
(3, "Charlie", ["English", "Math", "Physics"])
]

# Create a test DataFrame
df = spark.createDataFrame(data, schema)

# Show the Dataframe before flattening
df.show()

# Flatten the schema using Cobrix Scala 'SparkUtils.flattenSchema' method
sc = spark.sparkContext
java_import(sc._gateway.jvm, "za.co.absa.cobrix.spark.cobol.utils.SparkUtils")
dfFlatJvm = spark._jvm.SparkUtils.flattenSchema(df._jdf, False)
dfFlat = DataFrame(dfFlatJvm, SQLContext(sc))

# Show the Dataframe after flattening
dfFlat.show(truncate=False)
dfFlat.printSchema()
```

The output looks like this:
```
# Before flattening
+---+-------+------------------------+
|id |name |subjects |
+---+-------+------------------------+
|1 |Alice |[Math, Science] |
|2 |Bob |[History, Geography] |
|3 |Charlie|[English, Math, Physics]|
+---+-------+------------------------+
# After flattening
+---+-------+----------+----------+----------+
|id |name |subjects_0|subjects_1|subjects_2|
+---+-------+----------+----------+----------+
|1 |Alice |Math |Science |null |
|2 |Bob |History |Geography |null |
|3 |Charlie|English |Math |Physics |
+---+-------+----------+----------+----------+
```

## Summary of all available options

##### File reading options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,4 @@ object CobolSchema {
case None => CodePage.getCodePageByName(codePageName)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ class StringDecodersSpec extends AnyWordSpec {

val actual = decodeEbcdicString(bytes, KeepAll, new CodePage500, improvedNullDetection = false)

println(actual)

assert(actual == expected)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,4 +341,4 @@ object CobolSchema {

CobolSchema.fromBaseReader(CobolReaderSchema.fromReaderParameters(copyBookContents, readerParameters))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@ class DefaultSource
copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.cobrix.spark.cobol

import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructType}
import org.apache.spark.sql.types.{ArrayType, IntegerType, LongType, StringType, StructType}
import org.scalatest.wordspec.AnyWordSpec
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.cobrix.cobol.parser.CopybookParser
Expand Down Expand Up @@ -483,8 +483,6 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {

val sparkSchema = cobolSchema.getSparkSchema

sparkSchema.printTreeString()

assert(sparkSchema.fields.length == 3)
assert(sparkSchema.fields.head.name == "HEADER")
assert(sparkSchema.fields.head.dataType == StringType)
Expand All @@ -502,6 +500,54 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
assert(seg1.fields(2).name == "SEG3")
assert(seg1.fields(2).dataType.isInstanceOf[ArrayType])
}

"return a schema for a multi-segment copybook" in {
val copybook: String =
""" 01 RECORD.
| 05 HEADER PIC X(5).
| 05 SEGMENT-ID PIC X(2).
| 05 SEG1.
| 10 FIELD1 PIC 9(7).
| 05 SEG2 REDEFINES SEG1.
| 10 FIELD3 PIC X(7).
| 05 SEG3 REDEFINES SEG1.
| 10 FIELD4 PIC S9(7).
|""".stripMargin

val cobolSchema = CobolSchema.fromSparkOptions(Seq(copybook),
Map(
"segment_field" -> "SEGMENT-ID",
"redefine-segment-id-map:0" -> "SEG1 => 01",
"redefine-segment-id-map:1" -> "SEG2 => 02",
"redefine-segment-id-map:2" -> "SEG3 => 03",
"segment_field" -> "SEGMENT-ID",
"segment_id_level0" -> "TEST",
"generate_record_id" -> "true"
)
)

val sparkSchema = cobolSchema.getSparkSchema

assert(sparkSchema.fields.length == 9)
assert(sparkSchema.fields.head.name == "File_Id")
assert(sparkSchema.fields.head.dataType == IntegerType)
assert(sparkSchema.fields(1).name == "Record_Id")
assert(sparkSchema.fields(1).dataType == LongType)
assert(sparkSchema.fields(2).name == "Record_Byte_Length")
assert(sparkSchema.fields(2).dataType == IntegerType)
assert(sparkSchema.fields(3).name == "Seg_Id0")
assert(sparkSchema.fields(3).dataType == StringType)
assert(sparkSchema.fields(4).name == "HEADER")
assert(sparkSchema.fields(4).dataType == StringType)
assert(sparkSchema.fields(5).name == "SEGMENT_ID")
assert(sparkSchema.fields(5).dataType == StringType)
assert(sparkSchema.fields(6).name == "SEG1")
assert(sparkSchema.fields(6).dataType.isInstanceOf[StructType])
assert(sparkSchema.fields(7).name == "SEG2")
assert(sparkSchema.fields(7).dataType.isInstanceOf[StructType])
assert(sparkSchema.fields(8).name == "SEG3")
assert(sparkSchema.fields(8).dataType.isInstanceOf[StructType])
}
}

}

0 comments on commit 6f78ed2

Please sign in to comment.