From 3fc3df261a885c710ff63153fba2836a49b1d771 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 28 Mar 2023 11:46:08 +0200 Subject: [PATCH] #593 Add the ability to generate 'Record_Bytes' field that contains raw data of each record parsed. --- .../cobol/parser/common/Constants.scala | 1 + .../cobol/reader/FixedLenNestedReader.scala | 2 +- .../cobol/reader/VarLenNestedReader.scala | 2 +- .../extractors/record/RecordExtractors.scala | 59 ++-- .../iterator/FixedLenNestedRowIterator.scala | 1 + .../iterator/VarLenNestedIterator.scala | 1 + .../reader/parameters/CobolParameters.scala | 2 + .../reader/parameters/ReaderParameters.scala | 2 + .../cobol/reader/schema/CobolSchema.scala | 2 + .../TestDataGen19WideTransactions.scala | 138 ++++++++++ .../builder/SparkCobolOptionsBuilder.scala | 5 +- .../parameters/CobolParametersParser.scala | 7 + .../spark/cobol/schema/CobolSchema.scala | 15 +- .../cobol/CobolSchemaHierarchicalSpec.scala | 2 +- .../cobrix/spark/cobol/CobolSchemaSpec.scala | 137 +++++++++- .../cobol/source/base/SparkSchemaSpec.scala | 2 +- .../cobol/source/base/SparkTestBase.scala | 1 + .../source/base/impl/DummyCobolSchema.scala | 2 +- .../integration/Test17HierarchicalSpec.scala | 19 ++ .../Test35GeneratedRecordBytes.scala | 256 ++++++++++++++++++ 20 files changed, 605 insertions(+), 51 deletions(-) create mode 100644 examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen19WideTransactions.scala create mode 100644 spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala index 38f015b9..950dec28 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala @@ -63,6 +63,7 @@ object Constants { val fileIdField = "File_Id" val recordIdField = "Record_Id" val recordByteLength = "Record_Byte_Length" + val recordBytes = "Record_Bytes" // Non-terminals val nonTerminalsPostfix = "_NT" diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/FixedLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/FixedLenNestedReader.scala index 67407a47..34b00821 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/FixedLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/FixedLenNestedReader.scala @@ -150,6 +150,6 @@ class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String], readerProperties.fieldCodePage) ) ) - new CobolSchema(schema, schemaRetentionPolicy, "", false, metadataPolicy = readerProperties.metadataPolicy) + new CobolSchema(schema, schemaRetentionPolicy, "", false, readerProperties.generateRecordBytes, metadataPolicy = readerProperties.metadataPolicy) } } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 59bd4ed7..36174342 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -248,7 +248,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], )) val segIdFieldCount = readerProperties.multisegment.map(p => p.segmentLevelIds.size).getOrElse(0) val segmentIdPrefix = readerProperties.multisegment.map(p => p.segmentIdPrefix).getOrElse("") - new CobolSchema(schema, readerProperties.schemaPolicy, readerProperties.inputFileNameColumn, readerProperties.generateRecordId, segIdFieldCount, segmentIdPrefix) + new CobolSchema(schema, readerProperties.schemaPolicy, readerProperties.inputFileNameColumn, readerProperties.generateRecordId, readerProperties.generateRecordBytes, segIdFieldCount, segmentIdPrefix) } private def checkInputArgumentsValidity(): Unit = { diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index f15779a2..7a37d665 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -35,7 +35,8 @@ object RecordExtractors { * @param offsetBytes The offset to the beginning of the record (in bits). * @param policy A schema retention policy to be applied to the extracted record. * @param variableLengthOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements. - * @param generateRecordId If true a record id field will be added as the first field of the record. + * @param generateRecordId If true, a record id field will be added as the first field of the record. + * @param generateRecordBytes If true, a record bytes field will be added at the beginning of each record. * @param segmentLevelIds Segment ids to put to the extracted record if id generation it turned on. * @param fileId A file id to be put to the extractor record if generateRecordId == true. * @param recordId The record id to be saved to the record id field. @@ -53,6 +54,7 @@ object RecordExtractors { policy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal, variableLengthOccurs: Boolean = false, generateRecordId: Boolean = false, + generateRecordBytes: Boolean = false, segmentLevelIds: List[String] = Nil, fileId: Int = 0, recordId: Long = 0, @@ -197,7 +199,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, segmentLevelIds, fileId, recordId, data.length, generateInputFileField, inputFileName, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, handler) } /** @@ -417,7 +419,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, Nil, fileId, recordId, recordLength, generateInputFileField, inputFileName, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, handler) } /** @@ -436,6 +438,7 @@ object RecordExtractors { * @param ast The parsed copybook * @param records The array of [[T]] object for each Group of the copybook * @param generateRecordId If true a record id field will be added as the first field of the record. + * @param generateRecordBytes If true a record bytes field will be added at the beginning of the record. * @param fileId The file id to be saved to the file id field * @param recordId The record id to be saved to the record id field * @param recordByteLength The length of the record @@ -448,43 +451,39 @@ object RecordExtractors { records: List[T], policy: SchemaRetentionPolicy, generateRecordId: Boolean, + generateRecordBytes: Boolean, segmentLevelIds: List[String], fileId: Int, recordId: Long, recordByteLength: Int, + recordBytes: Array[Byte], generateInputFileField: Boolean, inputFileName: String, handler: RecordHandler[T] ): Seq[Any] = { - (generateRecordId, generateInputFileField) match { - case (false, false) if policy == SchemaRetentionPolicy.CollapseRoot => + val generatedFields = new ListBuffer[Any] + + if (generateRecordId) { + generatedFields.append(fileId, recordId, recordByteLength) + } + + if (generateRecordBytes) { + generatedFields.append(recordBytes) + } + + if (generateInputFileField) { + generatedFields.append(inputFileName) + } + + segmentLevelIds.foreach(generatedFields.append(_)) + + policy match { + case SchemaRetentionPolicy.CollapseRoot => // If the policy for schema retention is root collapsing, expand root fields - segmentLevelIds ++ records.flatMap(record => handler.toSeq(record)) - case (false, false) => + generatedFields ++ records.flatMap(record => handler.toSeq(record)) + case SchemaRetentionPolicy.KeepOriginal => // Return rows as the original sequence of groups - segmentLevelIds ++ records - case (true, false) if policy == SchemaRetentionPolicy.CollapseRoot => - // If the policy for schema retention is root collapsing, expand root fields - // and add fileId and recordId - val expandedRows = records.flatMap(record => handler.toSeq(record)) - fileId +: recordId +: recordByteLength +: (segmentLevelIds ++ expandedRows) - case (true, false) => - // Add recordId as the first field - fileId +: recordId +: recordByteLength +: (segmentLevelIds ++ records) - case (false, true) if policy == SchemaRetentionPolicy.CollapseRoot => - // If the policy for schema retention is root collapsing, expand root fields + adding the file name field - (segmentLevelIds :+ inputFileName) ++ records.flatMap(record => handler.toSeq(record)) - case (false, true) => - // Return rows as the original sequence of groups + adding the file name field - (segmentLevelIds :+ inputFileName) ++ records - case (true, true) if policy == SchemaRetentionPolicy.CollapseRoot => - // If the policy for schema retention is root collapsing, expand root fields - // and add fileId and recordId + adding the file name field - val expandedRows = records.flatMap(record => handler.toSeq(record)) - fileId +: recordId +: recordByteLength +: inputFileName +: (segmentLevelIds ++ expandedRows) - case (true, true) => - // Add recordId as the first field + adding the file name field - fileId +: recordId +: recordByteLength +: inputFileName +: (segmentLevelIds ++ records) + generatedFields ++ records } } } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala index 6a9fae68..cbf9e4b2 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala @@ -91,6 +91,7 @@ class FixedLenNestedRowIterator[T: ClassTag]( offset, policy, readerProperties.variableSizeOccurs, + generateRecordBytes = readerProperties.generateRecordBytes, activeSegmentRedefine = activeSegmentRedefine, handler = handler ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala index d0de99d1..49e72a80 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala @@ -98,6 +98,7 @@ final class VarLenNestedIterator[T: ClassTag](cobolSchema: Copybook, readerProperties.schemaPolicy, readerProperties.variableSizeOccurs, readerProperties.generateRecordId, + readerProperties.generateRecordBytes, segmentLevelIds, fileId, rawRecordIterator.getRecordIndex, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index 4fc35fab..b9c16907 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -44,6 +44,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param recordLength Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length. * @param variableLengthParams VariableLengthParameters containing the specifications for the consumption of variable-length Cobol records. * @param variableSizeOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements + * @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record * @param schemaRetentionPolicy A copybook usually has a root group struct element that acts like a rowtag in XML. This can be retained in Spark schema or can be collapsed * @param stringTrimmingPolicy Specify if and how strings should be trimmed when parsed * @param allowPartialRecords If true, partial ASCII records can be parsed (in cases when LF character is missing for example) @@ -76,6 +77,7 @@ case class CobolParameters( recordLength: Option[Int], variableLengthParams: Option[VariableLengthParameters], variableSizeOccurs: Boolean, + generateRecordBytes: Boolean, schemaRetentionPolicy: SchemaRetentionPolicy, stringTrimmingPolicy: StringTrimmingPolicy, allowPartialRecords: Boolean, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index 8e5bc46d..be74abce 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -53,6 +53,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param fileStartOffset A number of bytes to skip at the beginning of each file * @param fileEndOffset A number of bytes to skip at the end of each file * @param generateRecordId If true, a record id field will be prepended to each record. + * @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record * @param schemaPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook. * @param stringTrimmingPolicy Specifies if and how strings should be trimmed when parsed. * @param allowPartialRecords If true, partial ASCII records can be parsed (in cases when LF character is missing for example) @@ -96,6 +97,7 @@ case class ReaderParameters( fileStartOffset: Int = 0, fileEndOffset: Int = 0, generateRecordId: Boolean = false, + generateRecordBytes: Boolean = false, schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal, stringTrimmingPolicy: StringTrimmingPolicy = StringTrimmingPolicy.TrimBoth, allowPartialRecords: Boolean = false, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala index 34e980bc..d434034a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala @@ -30,6 +30,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param copybook A parsed copybook. * @param policy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. + * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) @@ -39,6 +40,7 @@ class CobolSchema(val copybook: Copybook, val policy: SchemaRetentionPolicy, val inputFileNameField: String, val generateRecordId: Boolean, + val generateRecordBytes: Boolean, val generateSegIdFieldsCnt: Int = 0, val segmentIdProvidedPrefix: String = "", val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends Serializable { diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen19WideTransactions.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen19WideTransactions.scala new file mode 100644 index 00000000..79e2eed5 --- /dev/null +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen19WideTransactions.scala @@ -0,0 +1,138 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spark.cobol.examples.parser.generators + +import com.example.spark.cobol.examples.parser.generators.TestDataGen17Hierarchical.{rand, roles} +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ + +import java.io.{BufferedOutputStream, FileOutputStream} +import scala.collection.mutable +import scala.util.Random + +/** + * This is a test data generator. The copybook for it is listed below. + */ +object TestDataGen19WideTransactions { + + // seed=100 is used for the integration test + val rand: Random = new Random(102) + + val numberOfRecordsToGenerate = rand.nextInt(900000) + 1000000 + //val numberOfRecordsToGenerate = rand.nextInt(90000) + 10000 + + /* + 01 RECORD. + 05 ACCOUNT-ID PIC X(15). + 05 CURRENCY PIC X(3). + 05 BALANCE PIC 9(12)V99. + 05 COMPANY-NAME PIC X(15). + 05 COMPANY-ID PIC X(10). + 05 DEPT-NAME PIC X(22). + 05 FIRST-NAME PIC X(16). + 05 LAST-NAME PIC X(16). + 05 AGE PIC 9(3). + 05 MARRIAGE-STATUS PIC X(1). + 05 ADDRESS PIC X(30). + 05 ZIP PIC X(10). + 05 PHONE-NUM PIC X(17). + 05 ROLE PIC X(18). + */ + + val currencies: Seq[String] = CommonLists.currencies + + val companies: Seq[Company] = CommonLists.companies + + val departments: Seq[String] = CommonLists.departments + + val roles: Seq[String] = CommonLists.roles + + val firstNames: Seq[String] = CommonLists.firstNames + + val lastNames: Seq[String] = CommonLists.lastNames + + + def main(args: Array[String]): Unit = { + + val numOfCurrencies = currencies.size + val numOfCompanies = companies.size + + val byteArray: Array[Byte] = new Array[Byte](190) + val bos = new BufferedOutputStream(new FileOutputStream("EBCDIC.ACCOUNTS.FEATURES.DATA.MAR24.dat")) + var i = 0 + while (i< numberOfRecordsToGenerate) { + // Account Id 100012300012345 + val accountId0 = rand.nextInt(4) + val accountId1 = rand.nextInt(900) + 100 + val accountId2 = rand.nextInt(90000) + 10000 + val accountId = accountId0.toString + "000" + accountId1.toString + "000" + accountId2.toString + + val currency = currencies(rand.nextInt(numOfCurrencies)) + val company = companies(rand.nextInt(numOfCompanies)) + + val dept = departments(rand.nextInt(departments.size)) + + val fname = firstNames(rand.nextInt(firstNames.size)) + val lname = lastNames(rand.nextInt(lastNames.size)) + val role = roles(rand.nextInt(roles.size)) + val address = companies(rand.nextInt(companies.size)).address + val phoneNum = s"+(${rand.nextInt(920) + 1}) ${rand.nextInt(899) + 100} ${rand.nextInt(89) + 10} ${rand.nextInt(89) + 10}" + val zip = (rand.nextInt(899999999) + 100000000).toString + val age = (rand.nextInt(60) + 19).toString + val mstatuses = "SSSSSSMMMDW" + val mstatus = mstatuses(rand.nextInt(mstatuses.length)).toString + + // Department details + + val tp = rand.nextInt(100) + val amountIntPart = if (tp < 20) { + rand.nextInt(1000).toLong + } else if (tp < 65) { + rand.nextInt(100000).toLong + } else { + rand.nextInt(10000000).toLong + } + + val intPartLen = amountIntPart.toString.length + val lpad = " " * (12 - intPartLen) + val amountIntPartStr = lpad + amountIntPart.toString + + val amountFracPart = if (amountIntPart < 10000) rand.nextInt(100) else 0 + + var offset = 0 + offset = putStringToArray("ACCOUNT-ID", byteArray, accountId, offset, 15) + offset = putStringToArray("CURRENCY", byteArray, currency, offset, 3) + offset = putStringToArray("BALANCE_i", byteArray, amountIntPartStr, offset, 12) + offset = putStringToArray("BALANCE_f", byteArray, amountFracPart.toString, offset, 2) + offset = putStringToArray("COMPANY-NAME", byteArray, company.companyName, offset, 15) + offset = putStringToArray("COMPANY-ID", byteArray, company.companyId, offset, 10) + offset = putStringToArray("DEPT-NAME", byteArray, dept, offset, 22) + offset = putStringToArray("FIRST-NAME", byteArray, fname, offset, 16) + offset = putStringToArray("LAST-NAME", byteArray, lname, offset, 16) + offset = putStringToArray("AGE", byteArray, age, offset, 3) + offset = putStringToArray("MSTATUS", byteArray, mstatus, offset, 1) + offset = putStringToArray("ADDRESS", byteArray, address, offset, 30) + offset = putStringToArray("ZIP", byteArray, zip, offset, 10) + offset = putStringToArray("PHONE-NUM", byteArray, phoneNum, offset, 17) + offset = putStringToArray("ROLE", byteArray, role, offset, 18) + + bos.write(byteArray) + i += 1 + } + bos.close() + } +} diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala index 1195b697..80c64397 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala @@ -85,6 +85,7 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes readerParams.schemaPolicy, inputFileNameField = "", generateRecordId = false, + readerParams.generateRecordBytes, metadataPolicy = readerParams.metadataPolicy) val sparkSchema = cobolSchema.getSparkSchema @@ -98,7 +99,9 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes val record = RecordExtractors.extractRecord[GenericRow](parsedCopybook.ast, array, 0, - schemaRetentionPolicy, handler = recordHandler) + schemaRetentionPolicy, + generateRecordBytes = readerParams.generateRecordBytes, + handler = recordHandler) Row.fromSeq(record) }) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index 237ed300..183f7e9d 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -55,6 +55,7 @@ object CobolParametersParser extends Logging { // Schema transformation parameters val PARAM_GENERATE_RECORD_ID = "generate_record_id" + val PARAM_GENERATE_RECORD_BYTES = "generate_record_bytes" val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy" val PARAM_GROUP_FILLERS = "drop_group_fillers" val PARAM_VALUE_FILLERS = "drop_value_fillers" @@ -251,6 +252,7 @@ object CobolParametersParser extends Logging { params.get(PARAM_RECORD_LENGTH).map(_.toInt), variableLengthParams, params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean, + params.getOrElse(PARAM_GENERATE_RECORD_BYTES, "false").toBoolean, schemaRetentionPolicy, stringTrimmingPolicy, params.getOrElse(PARAM_ALLOW_PARTIAL_RECORDS, "false").toBoolean, @@ -381,6 +383,7 @@ object CobolParametersParser extends Logging { fileStartOffset = varLenParams.fileStartOffset, fileEndOffset = varLenParams.fileEndOffset, generateRecordId = varLenParams.generateRecordId, + generateRecordBytes = parameters.generateRecordBytes, schemaPolicy = parameters.schemaRetentionPolicy, stringTrimmingPolicy = parameters.stringTrimmingPolicy, allowPartialRecords = parameters.allowPartialRecords, @@ -814,6 +817,10 @@ object CobolParametersParser extends Logging { throw new IllegalArgumentException(s"Options 'segment-children:*' cannot be used with 'segment_id_level*' or 'segment_id_root' " + "since ID fields generation is not supported for hierarchical records reader.") } + if (params.contains(PARAM_GENERATE_RECORD_BYTES)) { + throw new IllegalArgumentException(s"Option '$PARAM_GENERATE_RECORD_BYTES' cannot be used with 'segment-children:*' " + + "since hierarchical records are composites of more than one raw record.") + } } if (!isRecordSequence && params.contains(PARAM_INPUT_FILE_COLUMN)) { val recordSequenceCondition = s"one of this holds: '$PARAM_RECORD_FORMAT' = V or '$PARAM_RECORD_FORMAT' = VB or '$PARAM_RECORD_FORMAT' = D or '$PARAM_IS_RECORD_SEQUENCE' = true" + diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index 79d1da91..1132e4d5 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -39,6 +39,7 @@ import scala.collection.mutable.ArrayBuffer * @param copybook A parsed copybook. * @param policy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. + * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) @@ -48,11 +49,12 @@ class CobolSchema(copybook: Copybook, policy: SchemaRetentionPolicy, inputFileNameField: String, generateRecordId: Boolean, + generateRecordBytes: Boolean = false, generateSegIdFieldsCnt: Int = 0, segmentIdProvidedPrefix: String = "", metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends CobolReaderSchema( - copybook, policy, inputFileNameField, generateRecordId, + copybook, policy, inputFileNameField, generateRecordId, generateRecordBytes, generateSegIdFieldsCnt, segmentIdProvidedPrefix ) with Logging with Serializable { @@ -107,12 +109,18 @@ class CobolSchema(copybook: Copybook, recordsWithSegmentFields } + val recordsWithRecordBytes = if (generateRecordBytes) { + StructField(Constants.recordBytes, BinaryType, nullable = false) +: recordsWithFileName + } else { + recordsWithFileName + } + val recordsWithRecordId = if (generateRecordId) { StructField(Constants.fileIdField, IntegerType, nullable = false) +: StructField(Constants.recordIdField, LongType, nullable = false) +: - StructField(Constants.recordByteLength, IntegerType, nullable = false) +: recordsWithFileName + StructField(Constants.recordByteLength, IntegerType, nullable = false) +: recordsWithRecordBytes } else { - recordsWithFileName + recordsWithRecordBytes } StructType(recordsWithRecordId) @@ -318,6 +326,7 @@ object CobolSchema { schema.policy, schema.inputFileNameField, schema.generateRecordId, + schema.generateRecordBytes, schema.generateSegIdFieldsCnt, schema.segmentIdPrefix, schema.metadataPolicy diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala index 72753571..be750523 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala @@ -102,6 +102,6 @@ class CobolSchemaHierarchicalSpec extends AnyWordSpec { private def parseSchema(copybook: String, segmentRedefines: List[String], fieldParentMap: Map[String, String]): CobolSchema = { val parsedSchema = CopybookParser.parseTree(copybook, segmentRedefines = segmentRedefines, fieldParentMap = fieldParentMap) - new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "",false) + new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "",false, false) } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala index 04e64ecf..1f58726e 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala @@ -54,7 +54,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBookContents) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false, false) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -75,7 +75,48 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBookContents) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", true) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", true, false) + val actualSchema = cobolSchema.getSparkSchema.treeString + + assertEqualsMultiline(actualSchema, expectedSchema) + } + + "Generate record bytes field" in { + val expectedSchema: String = + """root + | |-- Record_Bytes: binary (nullable = false) + | |-- BIN_INT: integer (nullable = true) + | |-- STRUCT_FLD: struct (nullable = true) + | | |-- STR_FLD: string (nullable = true) + | |-- DATA_STRUCT: struct (nullable = true) + | | |-- EXAMPLE_INT_FLD: integer (nullable = true) + | | |-- EXAMPLE_STR_FLD: string (nullable = true) + |""".stripMargin.replaceAll("[\\r\\n]", "\n") + + val parsedSchema = CopybookParser.parseTree(copyBookContents) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false, true) + val actualSchema = cobolSchema.getSparkSchema.treeString + + assertEqualsMultiline(actualSchema, expectedSchema) + } + + "Generate record id and byte fields" in { + val expectedSchema: String = + """root + | |-- File_Id: integer (nullable = false) + | |-- Record_Id: long (nullable = false) + | |-- Record_Byte_Length: integer (nullable = false) + | |-- Record_Bytes: binary (nullable = false) + | |-- BIN_INT: integer (nullable = true) + | |-- STRUCT_FLD: struct (nullable = true) + | | |-- STR_FLD: string (nullable = true) + | |-- DATA_STRUCT: struct (nullable = true) + | | |-- EXAMPLE_INT_FLD: integer (nullable = true) + | | |-- EXAMPLE_STR_FLD: string (nullable = true) + |""".stripMargin.replaceAll("[\\r\\n]", "\n") + + val parsedSchema = CopybookParser.parseTree(copyBookContents) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", true, true) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -103,7 +144,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", true) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", true, false) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -119,7 +160,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", false) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", false, false) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -136,7 +177,40 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", true) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", true, false) + val actualSchema = cobolSchema.getSparkSchema.treeString + + assertEqualsMultiline(actualSchema, expectedSchema) + } + + "collapse root + record bytes generation" in { + val expectedSchema = + """root + | |-- Record_Bytes: binary (nullable = false) + | |-- IntValue: integer (nullable = true) + | |-- STR_FLD: string (nullable = true) + |""".stripMargin.replaceAll("[\\r\\n]", "\n") + + val parsedSchema = CopybookParser.parseTree(copyBook) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false, true) + val actualSchema = cobolSchema.getSparkSchema.treeString + + assertEqualsMultiline(actualSchema, expectedSchema) + } + + "collapse root + record id and bytes generation" in { + val expectedSchema = + """root + | |-- File_Id: integer (nullable = false) + | |-- Record_Id: long (nullable = false) + | |-- Record_Byte_Length: integer (nullable = false) + | |-- Record_Bytes: binary (nullable = false) + | |-- IntValue: integer (nullable = true) + | |-- STR_FLD: string (nullable = true) + |""".stripMargin.replaceAll("[\\r\\n]", "\n") + + val parsedSchema = CopybookParser.parseTree(copyBook) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", true, true) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -150,7 +224,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false, false) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -179,7 +253,46 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", true, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", true, false, 2) + val actualSchema = cobolSchema.getSparkSchema.treeString + + assertEqualsMultiline(actualSchema, expectedSchema) + } + + "multi-segment keep-original with record bytes generation" in { + val expectedSchema = + """root + | |-- Record_Bytes: binary (nullable = false) + | |-- Seg_Id0: string (nullable = true) + | |-- Seg_Id1: string (nullable = true) + | |-- STRUCT1: struct (nullable = true) + | | |-- IntValue: integer (nullable = true) + | |-- STRUCT2: struct (nullable = true) + | | |-- STR_FLD: string (nullable = true) + |""".stripMargin.replaceAll("[\\r\\n]", "\n") + val parsedSchema = CopybookParser.parseTree(copyBook) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", false, true, 2) + val actualSchema = cobolSchema.getSparkSchema.treeString + + assertEqualsMultiline(actualSchema, expectedSchema) + } + + "multi-segment keep-original with record id and bytes generation" in { + val expectedSchema = + """root + | |-- File_Id: integer (nullable = false) + | |-- Record_Id: long (nullable = false) + | |-- Record_Byte_Length: integer (nullable = false) + | |-- Record_Bytes: binary (nullable = false) + | |-- Seg_Id0: string (nullable = true) + | |-- Seg_Id1: string (nullable = true) + | |-- STRUCT1: struct (nullable = true) + | | |-- IntValue: integer (nullable = true) + | |-- STRUCT2: struct (nullable = true) + | | |-- STR_FLD: string (nullable = true) + |""".stripMargin.replaceAll("[\\r\\n]", "\n") + val parsedSchema = CopybookParser.parseTree(copyBook) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", true, true, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -196,7 +309,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", false, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", false, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -214,7 +327,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", true, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", true, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -229,7 +342,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -248,7 +361,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema1 = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", false) + val cobolSchema1 = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, "", false, false) val actualSparkSchema = cobolSchema1.getSparkSchema val rootField = actualSparkSchema.fields.head.dataType.asInstanceOf[StructType] @@ -277,7 +390,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema1 = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false) + val cobolSchema1 = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "", false, false) val actualSparkSchema = cobolSchema1.getSparkSchema val metadataStr1 = actualSparkSchema.fields.head.metadata diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala index baa03fb2..269c2ab8 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala @@ -37,7 +37,7 @@ class SparkSchemaSpec extends AnyFunSuite { val parsedSchema = CopybookParser.parseTree(copyBookContents) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "",false) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, "",false, false) val sparkSchema = cobolSchema.getSparkSchema diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkTestBase.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkTestBase.scala index af5c7dd8..dd7d2e4d 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkTestBase.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkTestBase.scala @@ -25,5 +25,6 @@ trait SparkTestBase { .config("spark.ui.enabled", "false") .config("spark.driver.bindAddress","127.0.0.1") .config("spark.driver.host", "127.0.0.1") + .config("spark.sql.shuffle.partitions", "1") .getOrCreate() } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala index 7b526431..33a8fc3b 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala @@ -24,7 +24,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import scala.collection.Seq -class DummyCobolSchema(val sparkSchema: StructType) extends CobolSchema(new Copybook(Group.root), SchemaRetentionPolicy.KeepOriginal, "", false) with Serializable { +class DummyCobolSchema(val sparkSchema: StructType) extends CobolSchema(new Copybook(Group.root), SchemaRetentionPolicy.KeepOriginal, "", false, false) with Serializable { override def getSparkSchema: StructType = sparkSchema override lazy val getRecordSize: Int = 40 diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala index 59c57bdd..8f29a7d6 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala @@ -358,6 +358,25 @@ class Test17HierarchicalSpec extends AnyWordSpec with SparkTestBase with CobolTe assert(ex.getMessage.contains("ID fields generation is not supported for hierarchical records reader")) } + + "throw an exception if record bytes generation is requested" in { + val ex = intercept[IllegalArgumentException] { + spark + .read + .format("cobol") + .option("copybook", inputCopybookPath) + .option("encoding", "ascii") + .option("record_format", "V") + .option("segment_field", "SEGMENT_ID") + .option("redefine_segment_id_map:1", "STATIC-DETAILS => C") + .option("redefine-segment-id-map:2", "CONTACTS => P") + .option("segment-children:1", "STATIC-DETAILS => CONTACTS") + .option("generate_record_bytes", "true") + .load(inpudDataPath) + } + + assert(ex.getMessage.contains("Option 'generate_record_bytes' cannot be used with 'segment-children:*'")) + } } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala new file mode 100644 index 00000000..d17f4365 --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala @@ -0,0 +1,256 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.integration + +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.spark.cobol.Cobrix +import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase} +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture + +class Test35GeneratedRecordBytes extends AnyWordSpec with SparkTestBase with BinaryFileFixture with SimpleComparisonBase { + private val copybook = + """ 01 RECORD. + 02 FIELD-1 PIC X(2). + """ + + "record bytes generation" should { + "work for EBCDIC fixed length RDDs" in { + val expected = """[{"Record_Bytes":"8fI=","FIELD_1":"12"},{"Record_Bytes":"8/Q=","FIELD_1":"34"}]""" + val data = Array(Array(0xF1, 0xF2).map(_.toByte), Array(0xF3, 0xF4).map(_.toByte)) + + val rdd = spark.sparkContext.parallelize(data) + + val df = Cobrix.fromRdd + .copybookContents(copybook) + .option("encoding", "ebcdic") + .option("generate_record_bytes", "true") + .load(rdd) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + + assert(actual == expected) + } + + "work for fixed length EBCDIC files" in { + val expected = """[{"Record_Bytes":"8fI=","FIELD_1":"12"},{"Record_Bytes":"8/Q=","FIELD_1":"34"}]""" + val data = Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte) + + withTempBinFile("recbytesgen", ".dat", data) { fileName => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ebcdic") + .option("generate_record_bytes", "true") + .load(fileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + assert(actual == expected) + } + } + + "work for variable length EBCDIC files" in { + val expected = """[{"Record_Bytes":"8fI=","FIELD_1":"12"},{"Record_Bytes":"8/Q=","FIELD_1":"34"}]""" + val data = Array(0x00, 0x02, 0x00, 0x00, 0xF1, 0xF2, 0x00, 0x02, 0x00, 0x00, 0xF3, 0xF4).map(_.toByte) + + withTempBinFile("recbytesgen", ".dat", data) { fileName => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "V") + .option("is_rdw_big_endian", "true") + .option("encoding", "ebcdic") + .option("generate_record_bytes", "true") + .load(fileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + assert(actual == expected) + } + } + + "work for ASCII files" in { + val expected = """[{"Record_Bytes":"MTI=","FIELD_1":"12"},{"Record_Bytes":"MzQ=","FIELD_1":"34"}]""" + val data = Array(Array(0x31, 0x32, 0x33, 0x34).map(_.toByte)) + + withTempBinFile("recbytesgen", ".dat", data.head) { fileName => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ascii") + .option("generate_record_bytes", "true") + .load(fileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + assert(actual == expected) + } + } + + "fail for hierarchical files" in { + val data = Array(Array(0x31, 0x32, 0x33, 0x34).map(_.toByte)) + + withTempBinFile("recbytesgen", ".dat", data.head) { fileName => + val ex = intercept[IllegalArgumentException] { + spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ascii") + .option("segment_field", "FIELD_1") + .option("redefine_segment_id_map:1", "FIELD_2 => C") + .option("redefine-segment-id-map:2", "FIELD_2 => P") + .option("segment-children:1", "FIELD_2 => FIELD_1") + .option("generate_record_bytes", "true") + .load(fileName) + } + + assert(ex.getMessage.contains("Option 'generate_record_bytes' cannot be used with 'segment-children:*'")) + } + } + } + + "record bytes generation with keep_original schema policy" should { + "work for EBCDIC fixed length RDDs" in { + val expected = """[{"Record_Bytes":"8fI=","RECORD":{"FIELD_1":"12"}},{"Record_Bytes":"8/Q=","RECORD":{"FIELD_1":"34"}}]""" + val data = Array(Array(0xF1, 0xF2).map(_.toByte), Array(0xF3, 0xF4).map(_.toByte)) + + val rdd = spark.sparkContext.parallelize(data) + + val df = Cobrix.fromRdd + .copybookContents(copybook) + .option("encoding", "ebcdic") + .option("generate_record_bytes", "true") + .option("schema_retention_policy", "keep_original") + .load(rdd) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + + assert(actual == expected) + } + + "work for fixed length EBCDIC files" in { + val expected = """[{"Record_Bytes":"8fI=","RECORD":{"FIELD_1":"12"}},{"Record_Bytes":"8/Q=","RECORD":{"FIELD_1":"34"}}]""" + val data = Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte) + + withTempBinFile("recbytesgen", ".dat", data) { fileName => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ebcdic") + .option("generate_record_bytes", "true") + .option("schema_retention_policy", "keep_original") + .load(fileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + assert(actual == expected) + } + } + + "work for variable length EBCDIC files" in { + val expected = """[{"Record_Bytes":"8fI=","RECORD":{"FIELD_1":"12"}},{"Record_Bytes":"8/Q=","RECORD":{"FIELD_1":"34"}}]""" + val data = Array(0x00, 0x02, 0x00, 0x00, 0xF1, 0xF2, 0x00, 0x02, 0x00, 0x00, 0xF3, 0xF4).map(_.toByte) + + withTempBinFile("recbytesgen", ".dat", data) { fileName => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "V") + .option("is_rdw_big_endian", "true") + .option("encoding", "ebcdic") + .option("generate_record_bytes", "true") + .option("schema_retention_policy", "keep_original") + .load(fileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + assert(actual == expected) + } + } + + "work for ASCII files" in { + val expected = """[{"Record_Bytes":"MTI=","RECORD":{"FIELD_1":"12"}},{"Record_Bytes":"MzQ=","RECORD":{"FIELD_1":"34"}}]""" + val data = Array(Array(0x31, 0x32, 0x33, 0x34).map(_.toByte)) + + withTempBinFile("recbytesgen", ".dat", data.head) { fileName => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ascii") + .option("generate_record_bytes", "true") + .option("schema_retention_policy", "keep_original") + .load(fileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + assert(actual == expected) + } + } + } + + "record id and bytes generation" should { + "work for fixed length EBCDIC files" in { + val expected = """[{"File_Id":0,"Record_Id":0,"Record_Byte_Length":2,"Record_Bytes":"8fI=","FIELD_1":"12"},{"File_Id":0,"Record_Id":1,"Record_Byte_Length":2,"Record_Bytes":"8/Q=","FIELD_1":"34"}]""" + val data = Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte) + + withTempBinFile("recbytesgen", ".dat", data) { fileName => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ebcdic") + .option("generate_record_id", "true") + .option("generate_record_bytes", "true") + .load(fileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + assert(actual == expected) + } + } + + "work for variable length EBCDIC files" in { + val expected = """[{"File_Id":0,"Record_Id":0,"Record_Byte_Length":2,"Record_Bytes":"8fI=","FIELD_1":"12"},{"File_Id":0,"Record_Id":1,"Record_Byte_Length":2,"Record_Bytes":"8/Q=","FIELD_1":"34"}]""" + val data = Array(0x00, 0x02, 0x00, 0x00, 0xF1, 0xF2, 0x00, 0x02, 0x00, 0x00, 0xF3, 0xF4).map(_.toByte) + + withTempBinFile("recbytesgen", ".dat", data) { fileName => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "V") + .option("is_rdw_big_endian", "true") + .option("encoding", "ebcdic") + .option("generate_record_id", "true") + .option("generate_record_bytes", "true") + .load(fileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + assert(actual == expected) + } + } + + "work for ASCII files" in { + val expected = """[{"File_Id":0,"Record_Id":0,"Record_Byte_Length":2,"Record_Bytes":"MTI=","FIELD_1":"12"},{"File_Id":0,"Record_Id":1,"Record_Byte_Length":2,"Record_Bytes":"MzQ=","FIELD_1":"34"}]""" + val data = Array(Array(0x31, 0x32, 0x33, 0x34).map(_.toByte)) + + withTempBinFile("recbytesgen", ".dat", data.head) { fileName => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ascii") + .option("generate_record_id", "true") + .option("generate_record_bytes", "true") + .load(fileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + assert(actual == expected) + } + } + } +}