Skip to content

Commit

Permalink
#574 Add support for reading data with fields having multiple code pa…
Browse files Browse the repository at this point in the history
…ges.
  • Loading branch information
yruslan committed Feb 13, 2023
1 parent 14e8ec2 commit b76804c
Show file tree
Hide file tree
Showing 15 changed files with 214 additions and 18 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,7 @@ You can have decimals when using COMP-3 as well.
| .option("ebcdic_code_page", "common") | Specifies a code page for EBCDIC encoding. Currently supported values: `common` (default), `common_extended`, `cp037`, `cp037_extended`, `cp838`, `cp870`, `cp875`, `cp1025`, `cp1047`. `*_extended` code pages supports non-printable characters that converts to ASCII codes below 32. |
| .option("ebcdic_code_page_class", "full.class.specifier") | Specifies a user provided class for a custom code page to UNICODE conversion. |
| .option("ascii_charset", "US-ASCII") | Specifies a charset to use to decode ASCII data. The value can be any charset supported by `java.nio.charset`: `US-ASCII` (default), `UTF-8`, `ISO-8859-1`, etc. |
| .option("field_code_page:cp825", "field1, field2") | Specifies the code page for selected fields. You can add mo than 1 such option for multiple code page overrides. |
| .option("is_utf16_big_endian", "true") | Specifies if UTF-16 encoded strings (`National` / `PIC N` format) are big-endian (default). |
| .option("floating_point_format", "IBM") | Specifies a floating-point format. Available options: `IBM` (default), `IEEE754`, `IBM_little_endian`, `IEEE754_little_endian`. |
| .option("variable_size_occurs", "false") | If `false` (default) fields that have `OCCURS 0 TO 100 TIMES DEPENDING ON` clauses always have the same size corresponding to the maximum array size (e.g. 100 in this example). If set to `true` the size of the field will shrink for each field that has less actual elements. |
Expand Down Expand Up @@ -1587,6 +1588,7 @@ A: Update hadoop dll to version 3.2.2 or newer.
## Changelog
- #### 2.6.4 will be released soon.
- [#576](https://github.com/AbsaOSS/cobrix/issues/576) Added the ability to create DataFrames from RDDs plus a copybook using `.Cobrix.fromRdd()` extension (look for 'Cobrix.fromRdd' for examples).
- [#574](https://github.com/AbsaOSS/cobrix/issues/574) Added the ability to read data files with fields encoded using multiple code pages using (`.option("field_code_page:cp037", "FIELD-1,FIELD_2")`).

- #### 2.6.3 released 1 February 2023.
- [#550](https://github.com/AbsaOSS/cobrix/issues/550) Added `.option("detailed_metadata", true)` option that adds many additional metadata fields (PIC, USAGE, etc) to the generated Spark schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ object CopybookParser extends Logging {
floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM,
nonTerminals: Seq[String] = Nil,
occursHandlers: Map[String, Map[String, Int]] = Map(),
debugFieldsPolicy: DebugFieldsPolicy = DebugFieldsPolicy.NoDebug): Copybook = {
debugFieldsPolicy: DebugFieldsPolicy = DebugFieldsPolicy.NoDebug,
fieldCodePageMap: Map[String, String] = Map.empty[String, String]): Copybook = {
parseTree(dataEncoding,
copyBookContents,
dropGroupFillers,
Expand All @@ -159,7 +160,8 @@ object CopybookParser extends Logging {
floatingPointFormat,
nonTerminals,
occursHandlers,
debugFieldsPolicy)
debugFieldsPolicy,
fieldCodePageMap)
}

/**
Expand Down Expand Up @@ -199,7 +201,8 @@ object CopybookParser extends Logging {
floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM,
nonTerminals: Seq[String] = Nil,
occursHandlers: Map[String, Map[String, Int]] = Map(),
debugFieldsPolicy: DebugFieldsPolicy = DebugFieldsPolicy.NoDebug): Copybook = {
debugFieldsPolicy: DebugFieldsPolicy = DebugFieldsPolicy.NoDebug,
fieldCodePageMap: Map[String, String] = Map.empty[String, String]): Copybook = {
parseTree(EBCDIC,
copyBookContents,
dropGroupFillers,
Expand All @@ -217,7 +220,8 @@ object CopybookParser extends Logging {
floatingPointFormat,
nonTerminals,
occursHandlers,
debugFieldsPolicy)
debugFieldsPolicy,
fieldCodePageMap)
}

/**
Expand Down Expand Up @@ -260,9 +264,10 @@ object CopybookParser extends Logging {
floatingPointFormat: FloatingPointFormat,
nonTerminals: Seq[String],
occursHandlers: Map[String, Map[String, Int]],
debugFieldsPolicy: DebugFieldsPolicy): Copybook = {
debugFieldsPolicy: DebugFieldsPolicy,
fieldCodePageMap: Map[String, String]): Copybook = {

val schemaANTLR: CopybookAST = ANTLRParser.parse(copyBookContents, enc, stringTrimmingPolicy, commentPolicy, strictSignOverpunch, improvedNullDetection, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat)
val schemaANTLR: CopybookAST = ANTLRParser.parse(copyBookContents, enc, stringTrimmingPolicy, commentPolicy, strictSignOverpunch, improvedNullDetection, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat, fieldCodePageMap)

val nonTerms: Set[String] = (for (id <- nonTerminals)
yield transformIdentifier(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ object ANTLRParser extends Logging {
ebcdicCodePage: CodePage,
asciiCharset: Charset,
isUtf16BigEndian: Boolean,
floatingPointFormat: FloatingPointFormat): CopybookAST = {
val visitor = new ParserVisitor(enc, stringTrimmingPolicy, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat, strictSignOverpunch, improvedNullDetection)
floatingPointFormat: FloatingPointFormat,
fieldCodePageMap: Map[String, String]): CopybookAST = {
val visitor = new ParserVisitor(enc, stringTrimmingPolicy, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat, strictSignOverpunch, improvedNullDetection, fieldCodePageMap)

val strippedContents = filterSpecialCharacters(copyBookContents).split("\\r?\\n").map(
line =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.cobrix.cobol.parser.antlr

import java.nio.charset.Charset
import org.antlr.v4.runtime.{ParserRuleContext, RuleContext}
import sun.nio.cs.StandardCharsets
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
import za.co.absa.cobrix.cobol.parser.ast.datatype._
Expand All @@ -26,7 +27,7 @@ import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.encoding.{Encoding, UTF16}
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC, Encoding, UTF16}
import za.co.absa.cobrix.cobol.parser.exceptions.SyntaxErrorException
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.parser.position.{Left, Position, Right}
Expand All @@ -46,7 +47,8 @@ class ParserVisitor(enc: Encoding,
isUtf16BigEndian: Boolean,
floatingPointFormat: FloatingPointFormat,
strictSignOverpunch: Boolean,
improvedNullDetection: Boolean) extends copybookParserBaseVisitor[Expr] {
improvedNullDetection: Boolean,
fieldCodePageMap: Map[String, String]) extends copybookParserBaseVisitor[Expr] {
/* expressions */
case class IdentifierExpr(value: String, originalValue: String) extends Expr
case class OccursExpr(m: Int, M: Option[Int], dep: Option[String]) extends Expr
Expand Down Expand Up @@ -810,6 +812,21 @@ class ParserVisitor(enc: Encoding,

checkBounds(ctx.pic(0), pic)

val identifierLowercase = identifier.toLowerCase()
val effectiveEbcdicCodePage = if (enc == EBCDIC && fieldCodePageMap.contains(identifierLowercase)) {
val codePageStr = fieldCodePageMap(identifierLowercase)
CodePage.getCodePageByName(codePageStr)
} else {
ebcdicCodePage
}

val effectiveAsciiCharset = if (enc == ASCII && fieldCodePageMap.contains(identifierLowercase)) {
val charsetStr = fieldCodePageMap(identifierLowercase)
Charset.forName(charsetStr)
} else {
asciiCharset
}

val prim = Primitive(
section,
identifier,
Expand All @@ -824,7 +841,7 @@ class ParserVisitor(enc: Encoding,
Map(),
isDependee = false,
identifier.toUpperCase() == Constants.FILLER,
DecoderSelector.getDecoder(pic.value, stringTrimmingPolicy, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat, strictSignOverpunch, improvedNullDetection)
DecoderSelector.getDecoder(pic.value, stringTrimmingPolicy, effectiveEbcdicCodePage, effectiveAsciiCharset, isUtf16BigEndian, floatingPointFormat, strictSignOverpunch, improvedNullDetection)
) (Some(parent))

parent.children.append(prim)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],
floatingPointFormat,
nonTerminals,
occursMappings,
readerProperties.debugFieldsPolicy)
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
else
Copybook.merge(
copyBookContents.map(
Expand All @@ -145,7 +146,8 @@ class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],
floatingPointFormat,
nonTerminals,
occursMappings,
readerProperties.debugFieldsPolicy)
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
)
)
new CobolSchema(schema, schemaRetentionPolicy, "", false, detailedMetadata = readerProperties.detailedMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
readerProperties.floatingPointFormat,
readerProperties.nonTerminals,
readerProperties.occursMappings,
readerProperties.debugFieldsPolicy)
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
else
Copybook.merge(copyBookContents.map(cpb =>
CopybookParser.parseTree(encoding,
Expand All @@ -242,7 +243,8 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
readerProperties.floatingPointFormat,
nonTerminals = readerProperties.nonTerminals,
readerProperties.occursMappings,
readerProperties.debugFieldsPolicy)
readerProperties.debugFieldsPolicy,
readerProperties.fieldCodePage)
))
val segIdFieldCount = readerProperties.multisegment.map(p => p.segmentLevelIds.size).getOrElse(0)
val segmentIdPrefix = readerProperties.multisegment.map(p => p.segmentIdPrefix).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param ebcdicCodePage Specifies what code page to use for EBCDIC to ASCII/Unicode conversions
* @param ebcdicCodePageClass An optional custom code page conversion class provided by a user
* @param asciiCharset A charset for ASCII data
* @param fieldCodePage Specifies a mapping between a field name and the code page
* @param isUtf16BigEndian If true UTF-16 is considered big-endian.
* @param floatingPointFormat A format of floating-point numbers
* @param recordStartOffset A number of bytes to skip at the beginning of the record before parsing a record according to a copybook
Expand Down Expand Up @@ -67,6 +68,7 @@ case class CobolParameters(
ebcdicCodePage: String,
ebcdicCodePageClass: Option[String],
asciiCharset: String,
fieldCodePage: Map[String, String],
isUtf16BigEndian: Boolean,
floatingPointFormat: FloatingPointFormat,
recordStartOffset: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param ebcdicCodePage Specifies what code page to use for EBCDIC to ASCII/Unicode conversions
* @param ebcdicCodePageClass An optional custom code page conversion class provided by a user
* @param asciiCharset A charset for ASCII data
* @param fieldCodePage Specifies a mapping between a field name and the code page
* @param isUtf16BigEndian If true UTF-16 strings are considered big-endian.
* @param floatingPointFormat A format of floating-point numbers
* @param variableSizeOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements
Expand Down Expand Up @@ -75,6 +76,7 @@ case class ReaderParameters(
ebcdicCodePage: String = "common",
ebcdicCodePageClass: Option[String] = None,
asciiCharset: String = "",
fieldCodePage: Map[String, String] = Map.empty[String, String],
isUtf16BigEndian: Boolean = true,
floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM,
variableSizeOccurs: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class DataSizeSpec extends AnyFunSuite {
isUtf16BigEndian = true,
floatingPointFormat = FloatingPointFormat.IBM,
strictSignOverpunch = true,
improvedNullDetection = false)
improvedNullDetection = false,
fieldCodePageMap = Map.empty)

val charStream = CharStreams.fromString("01 RECORD.\n 05 ABC PIC " + pic + ".")
val lexer = new copybookLexer(charStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class PicValidationSpec extends AnyFunSuite {
isUtf16BigEndian = true,
FloatingPointFormat.IBM,
strictSignOverpunch = true,
improvedNullDetection = false)
improvedNullDetection = false,
fieldCodePageMap = Map.empty)

val charStream = CharStreams.fromString("01 RECORD.\n 05 ABC PIC " + pic + ".")
val lexer = new copybookLexer(charStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ object RddReaderParams {
(PARAM_ENCODING -> "ascii") +
(PARAM_ASCII_CHARSET -> "utf8")

if (options.keys.exists(_.startsWith(PARAM_FIELD_CODE_PAGE_PREFIX))) {
throw new IllegalArgumentException(s"Code page mapping using '$PARAM_FIELD_CODE_PAGE_PREFIX*' is not supported with text RDD reader. Use forBinary()")
}

forBinary(textOptions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes
floatingPointFormat = readerParams.floatingPointFormat,
nonTerminals = readerParams.nonTerminals,
occursHandlers = readerParams.occursMappings,
debugFieldsPolicy = readerParams.debugFieldsPolicy
debugFieldsPolicy = readerParams.debugFieldsPolicy,
fieldCodePageMap = readerParams.fieldCodePage
)

val cobolSchema = new CobolSchema(parsedCopybook,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ object CobolParametersParser extends Logging {
val PARAM_STRICT_SIGN_OVERPUNCHING = "strict_sign_overpunching"
val PARAM_IMPROVED_NULL_DETECTION = "improved_null_detection"
val PARAM_ALLOW_PARTIAL_RECORDS = "allow_partial_records"
val PARAM_FIELD_CODE_PAGE_PREFIX = "field_code_page:"

// Parameters for multisegment variable length files
val PARAM_RECORD_FORMAT = "record_format"
Expand Down Expand Up @@ -242,6 +243,7 @@ object CobolParametersParser extends Logging {
ebcdicCodePageName,
ebcdicCodePageClass,
asciiCharset,
getFieldCodepageMap(params),
params.getOrElse(PARAM_IS_UTF16_BIG_ENDIAN, "true").toBoolean,
getFloatingPointFormat(params),
params.getOrElse(PARAM_RECORD_START_OFFSET, "0").toInt,
Expand Down Expand Up @@ -294,6 +296,34 @@ object CobolParametersParser extends Logging {
isEbcdic
}

def getFieldCodepageMap(parameters: Parameters): Map[String, String] = {
val entries = parameters
.getMap
.keys
.filter(_.startsWith(PARAM_FIELD_CODE_PAGE_PREFIX))

entries.flatMap { key =>
val idx = key.indexOf(':')
if (idx >= 0) {
val codePage = key.substring(idx + 1).trim
val fieldsStr = parameters.get(key).get

if (codePage.isEmpty) {
logger.warn(s"Incorrect code page name for the option '$key' -> '$fieldsStr'.")
Array.empty[(String, String)]
} else {
val fields = fieldsStr.split(',').map(fld =>
CopybookParser.transformIdentifier(fld.trim.toLowerCase)
).filter(_.nonEmpty)

fields.map(field => (field, codePage))
}
} else {
Array.empty[(String, String)]
}
}.toMap
}

def getReaderProperties(parameters: CobolParameters, defaultBlockSize: Option[Int]): ReaderParameters = {
val varLenParams: VariableLengthParameters = parameters.variableLengthParams
.getOrElse(
Expand Down Expand Up @@ -331,6 +361,7 @@ object CobolParametersParser extends Logging {
ebcdicCodePage = parameters.ebcdicCodePage,
ebcdicCodePageClass = parameters.ebcdicCodePageClass,
asciiCharset = parameters.asciiCharset,
fieldCodePage = parameters.fieldCodePage,
isUtf16BigEndian = parameters.isUtf16BigEndian,
floatingPointFormat = parameters.floatingPointFormat,
variableSizeOccurs = parameters.variableSizeOccurs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,21 @@ class ParametersParsingSpec extends AnyFunSuite {
assert(ex.getMessage == "Duplicate child 'EMPLOYEE' for parents DEPT_ROOT and COMPANY_ROOT specified for 'segment-children' option.")
}

test("Test getFieldCodepageMap() generating a proper map") {
val myMap = Map("copybook" -> "something",
"field_code_page" -> "dummy1",
"field_code_page:" -> "dummy2",
"field_code_page:cp1256" -> "FIELD1",
"field_code_page:us-ascii" -> " FIELD-2 , FIELD_3 "
)
val params = new Parameters(myMap)

val fieldCodaPageMap = CobolParametersParser.getFieldCodepageMap(params)

assert(fieldCodaPageMap.size == 3)
assert(fieldCodaPageMap("field1") == "cp1256")
assert(fieldCodaPageMap("field_2") == "us-ascii")
assert(fieldCodaPageMap("field_3") == "us-ascii")
}

}
Loading

0 comments on commit b76804c

Please sign in to comment.