Skip to content

Commit

Permalink
#593 WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Mar 28, 2023
1 parent b7d8ba8 commit 8067ae4
Show file tree
Hide file tree
Showing 20 changed files with 605 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ object Constants {
val fileIdField = "File_Id"
val recordIdField = "Record_Id"
val recordByteLength = "Record_Byte_Length"
val recordBytes = "Record_Bytes"

// Non-terminals
val nonTerminalsPostfix = "_NT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,6 @@ class FixedLenNestedReader[T: ClassTag](copyBookContents: Seq[String],
readerProperties.fieldCodePage)
)
)
new CobolSchema(schema, schemaRetentionPolicy, "", false, metadataPolicy = readerProperties.metadataPolicy)
new CobolSchema(schema, schemaRetentionPolicy, "", false, readerProperties.generateRecordBytes, metadataPolicy = readerProperties.metadataPolicy)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
))
val segIdFieldCount = readerProperties.multisegment.map(p => p.segmentLevelIds.size).getOrElse(0)
val segmentIdPrefix = readerProperties.multisegment.map(p => p.segmentIdPrefix).getOrElse("")
new CobolSchema(schema, readerProperties.schemaPolicy, readerProperties.inputFileNameColumn, readerProperties.generateRecordId, segIdFieldCount, segmentIdPrefix)
new CobolSchema(schema, readerProperties.schemaPolicy, readerProperties.inputFileNameColumn, readerProperties.generateRecordId, readerProperties.generateRecordBytes, segIdFieldCount, segmentIdPrefix)
}

private def checkInputArgumentsValidity(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object RecordExtractors {
* @param offsetBytes The offset to the beginning of the record (in bits).
* @param policy A schema retention policy to be applied to the extracted record.
* @param variableLengthOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements.
* @param generateRecordId If true a record id field will be added as the first field of the record.
* @param generateRecordId If true, a record id field will be added as the first field of the record.
* @param generateRecordBytes If true, a record bytes field will be added at the beginning of each record.
* @param segmentLevelIds Segment ids to put to the extracted record if id generation it turned on.
* @param fileId A file id to be put to the extractor record if generateRecordId == true.
* @param recordId The record id to be saved to the record id field.
Expand All @@ -53,6 +54,7 @@ object RecordExtractors {
policy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal,
variableLengthOccurs: Boolean = false,
generateRecordId: Boolean = false,
generateRecordBytes: Boolean = false,
segmentLevelIds: List[String] = Nil,
fileId: Int = 0,
recordId: Long = 0,
Expand Down Expand Up @@ -197,7 +199,7 @@ object RecordExtractors {
policy
}

applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, segmentLevelIds, fileId, recordId, data.length, generateInputFileField, inputFileName, handler)
applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, handler)
}

/**
Expand Down Expand Up @@ -417,7 +419,7 @@ object RecordExtractors {
policy
}

applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, Nil, fileId, recordId, recordLength, generateInputFileField, inputFileName, handler)
applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, handler)
}

/**
Expand All @@ -436,6 +438,7 @@ object RecordExtractors {
* @param ast The parsed copybook
* @param records The array of [[T]] object for each Group of the copybook
* @param generateRecordId If true a record id field will be added as the first field of the record.
* @param generateRecordBytes If true a record bytes field will be added at the beginning of the record.
* @param fileId The file id to be saved to the file id field
* @param recordId The record id to be saved to the record id field
* @param recordByteLength The length of the record
Expand All @@ -448,43 +451,39 @@ object RecordExtractors {
records: List[T],
policy: SchemaRetentionPolicy,
generateRecordId: Boolean,
generateRecordBytes: Boolean,
segmentLevelIds: List[String],
fileId: Int,
recordId: Long,
recordByteLength: Int,
recordBytes: Array[Byte],
generateInputFileField: Boolean,
inputFileName: String,
handler: RecordHandler[T]
): Seq[Any] = {
(generateRecordId, generateInputFileField) match {
case (false, false) if policy == SchemaRetentionPolicy.CollapseRoot =>
val generatedFields = new ListBuffer[Any]

segmentLevelIds.foreach(generatedFields.append(_))

if (generateRecordId) {
generatedFields.append(fileId, recordId, recordByteLength)
}

if (generateRecordBytes) {
generatedFields.append(recordBytes)
}

if (generateInputFileField) {
generatedFields.append(inputFileName)
}

policy match {
case SchemaRetentionPolicy.CollapseRoot =>
// If the policy for schema retention is root collapsing, expand root fields
segmentLevelIds ++ records.flatMap(record => handler.toSeq(record))
case (false, false) =>
generatedFields ++ records.flatMap(record => handler.toSeq(record))
case SchemaRetentionPolicy.KeepOriginal =>
// Return rows as the original sequence of groups
segmentLevelIds ++ records
case (true, false) if policy == SchemaRetentionPolicy.CollapseRoot =>
// If the policy for schema retention is root collapsing, expand root fields
// and add fileId and recordId
val expandedRows = records.flatMap(record => handler.toSeq(record))
fileId +: recordId +: recordByteLength +: (segmentLevelIds ++ expandedRows)
case (true, false) =>
// Add recordId as the first field
fileId +: recordId +: recordByteLength +: (segmentLevelIds ++ records)
case (false, true) if policy == SchemaRetentionPolicy.CollapseRoot =>
// If the policy for schema retention is root collapsing, expand root fields + adding the file name field
(segmentLevelIds :+ inputFileName) ++ records.flatMap(record => handler.toSeq(record))
case (false, true) =>
// Return rows as the original sequence of groups + adding the file name field
(segmentLevelIds :+ inputFileName) ++ records
case (true, true) if policy == SchemaRetentionPolicy.CollapseRoot =>
// If the policy for schema retention is root collapsing, expand root fields
// and add fileId and recordId + adding the file name field
val expandedRows = records.flatMap(record => handler.toSeq(record))
fileId +: recordId +: recordByteLength +: inputFileName +: (segmentLevelIds ++ expandedRows)
case (true, true) =>
// Add recordId as the first field + adding the file name field
fileId +: recordId +: recordByteLength +: inputFileName +: (segmentLevelIds ++ records)
generatedFields ++ records
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class FixedLenNestedRowIterator[T: ClassTag](
offset,
policy,
readerProperties.variableSizeOccurs,
generateRecordBytes = readerProperties.generateRecordBytes,
activeSegmentRedefine = activeSegmentRedefine,
handler = handler
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ final class VarLenNestedIterator[T: ClassTag](cobolSchema: Copybook,
readerProperties.schemaPolicy,
readerProperties.variableSizeOccurs,
readerProperties.generateRecordId,
readerProperties.generateRecordBytes,
segmentLevelIds,
fileId,
rawRecordIterator.getRecordIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param recordLength Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length.
* @param variableLengthParams VariableLengthParameters containing the specifications for the consumption of variable-length Cobol records.
* @param variableSizeOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements
* @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record
* @param schemaRetentionPolicy A copybook usually has a root group struct element that acts like a rowtag in XML. This can be retained in Spark schema or can be collapsed
* @param stringTrimmingPolicy Specify if and how strings should be trimmed when parsed
* @param allowPartialRecords If true, partial ASCII records can be parsed (in cases when LF character is missing for example)
Expand Down Expand Up @@ -76,6 +77,7 @@ case class CobolParameters(
recordLength: Option[Int],
variableLengthParams: Option[VariableLengthParameters],
variableSizeOccurs: Boolean,
generateRecordBytes: Boolean,
schemaRetentionPolicy: SchemaRetentionPolicy,
stringTrimmingPolicy: StringTrimmingPolicy,
allowPartialRecords: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param fileStartOffset A number of bytes to skip at the beginning of each file
* @param fileEndOffset A number of bytes to skip at the end of each file
* @param generateRecordId If true, a record id field will be prepended to each record.
* @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record
* @param schemaPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
* @param stringTrimmingPolicy Specifies if and how strings should be trimmed when parsed.
* @param allowPartialRecords If true, partial ASCII records can be parsed (in cases when LF character is missing for example)
Expand Down Expand Up @@ -96,6 +97,7 @@ case class ReaderParameters(
fileStartOffset: Int = 0,
fileEndOffset: Int = 0,
generateRecordId: Boolean = false,
generateRecordBytes: Boolean = false,
schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal,
stringTrimmingPolicy: StringTrimmingPolicy = StringTrimmingPolicy.TrimBoth,
allowPartialRecords: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param copybook A parsed copybook.
* @param policy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
* @param generateRecordId If true, a record id field will be prepended to the beginning of the schema.
* @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema.
* @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema.
* @param generateSegIdFieldsCnt A number of segment ID levels to generate
* @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used)
Expand All @@ -39,6 +40,7 @@ class CobolSchema(val copybook: Copybook,
val policy: SchemaRetentionPolicy,
val inputFileNameField: String,
val generateRecordId: Boolean,
val generateRecordBytes: Boolean,
val generateSegIdFieldsCnt: Int = 0,
val segmentIdProvidedPrefix: String = "",
val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.spark.cobol.examples.parser.generators

import com.example.spark.cobol.examples.parser.generators.TestDataGen17Hierarchical.{rand, roles}
import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._

import java.io.{BufferedOutputStream, FileOutputStream}
import scala.collection.mutable
import scala.util.Random

/**
* This is a test data generator. The copybook for it is listed below.
*/
object TestDataGen19WideTransactions {

// seed=100 is used for the integration test
val rand: Random = new Random(102)

val numberOfRecordsToGenerate = rand.nextInt(900000) + 1000000
//val numberOfRecordsToGenerate = rand.nextInt(90000) + 10000

/*
01 RECORD.
05 ACCOUNT-ID PIC X(15).
05 CURRENCY PIC X(3).
05 BALANCE PIC 9(12)V99.
05 COMPANY-NAME PIC X(15).
05 COMPANY-ID PIC X(10).
05 DEPT-NAME PIC X(22).
05 FIRST-NAME PIC X(16).
05 LAST-NAME PIC X(16).
05 AGE PIC 9(3).
05 MARRIAGE-STATUS PIC X(1).
05 ADDRESS PIC X(30).
05 ZIP PIC X(10).
05 PHONE-NUM PIC X(17).
05 ROLE PIC X(18).
*/

val currencies: Seq[String] = CommonLists.currencies

val companies: Seq[Company] = CommonLists.companies

val departments: Seq[String] = CommonLists.departments

val roles: Seq[String] = CommonLists.roles

val firstNames: Seq[String] = CommonLists.firstNames

val lastNames: Seq[String] = CommonLists.lastNames


def main(args: Array[String]): Unit = {

val numOfCurrencies = currencies.size
val numOfCompanies = companies.size

val byteArray: Array[Byte] = new Array[Byte](190)
val bos = new BufferedOutputStream(new FileOutputStream("EBCDIC.ACCOUNTS.FEATURES.DATA.MAR24.dat"))
var i = 0
while (i< numberOfRecordsToGenerate) {
// Account Id 100012300012345
val accountId0 = rand.nextInt(4)
val accountId1 = rand.nextInt(900) + 100
val accountId2 = rand.nextInt(90000) + 10000
val accountId = accountId0.toString + "000" + accountId1.toString + "000" + accountId2.toString

val currency = currencies(rand.nextInt(numOfCurrencies))
val company = companies(rand.nextInt(numOfCompanies))

val dept = departments(rand.nextInt(departments.size))

val fname = firstNames(rand.nextInt(firstNames.size))
val lname = lastNames(rand.nextInt(lastNames.size))
val role = roles(rand.nextInt(roles.size))
val address = companies(rand.nextInt(companies.size)).address
val phoneNum = s"+(${rand.nextInt(920) + 1}) ${rand.nextInt(899) + 100} ${rand.nextInt(89) + 10} ${rand.nextInt(89) + 10}"
val zip = (rand.nextInt(899999999) + 100000000).toString
val age = (rand.nextInt(60) + 19).toString
val mstatuses = "SSSSSSMMMDW"
val mstatus = mstatuses(rand.nextInt(mstatuses.length)).toString

// Department details

val tp = rand.nextInt(100)
val amountIntPart = if (tp < 20) {
rand.nextInt(1000).toLong
} else if (tp < 65) {
rand.nextInt(100000).toLong
} else {
rand.nextInt(10000000).toLong
}

val intPartLen = amountIntPart.toString.length
val lpad = " " * (12 - intPartLen)
val amountIntPartStr = lpad + amountIntPart.toString

val amountFracPart = if (amountIntPart < 10000) rand.nextInt(100) else 0

var offset = 0
offset = putStringToArray("ACCOUNT-ID", byteArray, accountId, offset, 15)
offset = putStringToArray("CURRENCY", byteArray, currency, offset, 3)
offset = putStringToArray("BALANCE_i", byteArray, amountIntPartStr, offset, 12)
offset = putStringToArray("BALANCE_f", byteArray, amountFracPart.toString, offset, 2)
offset = putStringToArray("COMPANY-NAME", byteArray, company.companyName, offset, 15)
offset = putStringToArray("COMPANY-ID", byteArray, company.companyId, offset, 10)
offset = putStringToArray("DEPT-NAME", byteArray, dept, offset, 22)
offset = putStringToArray("FIRST-NAME", byteArray, fname, offset, 16)
offset = putStringToArray("LAST-NAME", byteArray, lname, offset, 16)
offset = putStringToArray("AGE", byteArray, age, offset, 3)
offset = putStringToArray("MSTATUS", byteArray, mstatus, offset, 1)
offset = putStringToArray("ADDRESS", byteArray, address, offset, 30)
offset = putStringToArray("ZIP", byteArray, zip, offset, 10)
offset = putStringToArray("PHONE-NUM", byteArray, phoneNum, offset, 17)
offset = putStringToArray("ROLE", byteArray, role, offset, 18)

bos.write(byteArray)
i += 1
}
bos.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes
readerParams.schemaPolicy,
inputFileNameField = "",
generateRecordId = false,
readerParams.generateRecordBytes,
metadataPolicy = readerParams.metadataPolicy)
val sparkSchema = cobolSchema.getSparkSchema

Expand All @@ -98,7 +99,9 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes
val record = RecordExtractors.extractRecord[GenericRow](parsedCopybook.ast,
array,
0,
schemaRetentionPolicy, handler = recordHandler)
schemaRetentionPolicy,
generateRecordBytes = readerParams.generateRecordBytes,
handler = recordHandler)
Row.fromSeq(record)
})

Expand Down
Loading

0 comments on commit 8067ae4

Please sign in to comment.