Skip to content

Commit

Permalink
#249 Fix the hierarchical row extractor to support variable length OC…
Browse files Browse the repository at this point in the history
…CURS.
  • Loading branch information
yruslan committed Feb 24, 2020
1 parent 34df9fc commit baf8fe4
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ object RowExtractors {
var i = from
var j = 0
while (i < actualSize) {
val value = getGroupValues(offset, grp, data, currentIndex, parentSegmentIds)
offset += grp.binaryProperties.dataSize
val (size, value) = getGroupValues(offset, grp, data, currentIndex, parentSegmentIds)
offset += size
groupValues(j) = value
i += 1
j += 1
Expand All @@ -259,7 +259,7 @@ object RowExtractors {
}
}

def extractValue(field: Statement, useOffset: Int, data: Array[Byte], currentIndex: Int, parentSegmentIds: List[String]): Any = {
def extractValue(field: Statement, useOffset: Int, data: Array[Byte], currentIndex: Int, parentSegmentIds: List[String]): (Int, Any) = {
field match {
case grp: Group =>
getGroupValues(useOffset, grp, data, currentIndex, parentSegmentIds)
Expand All @@ -273,7 +273,7 @@ object RowExtractors {
}
dependFields += st.name -> intVal
}
value
(st.binaryProperties.actualSize, value)
}
}

Expand All @@ -289,7 +289,8 @@ object RowExtractors {
segmentsData(i) match {
case (segmentId, segmentData) =>
if (segmentIdRedefineMap.get(segmentId).map(_.name).getOrElse("") == field.name) {
children += getGroupValues(field.binaryProperties.offset, field, segmentData, i, segmentId :: parentSegmentIds)
val (_, child) = getGroupValues(field.binaryProperties.offset, field, segmentData, i, segmentId :: parentSegmentIds)
children += child
} else {
if (parentSegmentIds.contains(segmentId)) {
break = true
Expand All @@ -302,7 +303,7 @@ object RowExtractors {
children.toArray
}

def getGroupValues(offset: Int, group: Group, data: Array[Byte], currentIndex: Int, parentSegmentIds: List[String]): Row = {
def getGroupValues(offset: Int, group: Group, data: Array[Byte], currentIndex: Int, parentSegmentIds: List[String]): (Int, Row) = {
var bitOffset = offset

val childrenNum = if (group.isSegmentRedefine) {
Expand All @@ -324,9 +325,13 @@ object RowExtractors {
}
value
} else {
val value = extractValue(field, bitOffset, data, currentIndex, parentSegmentIds)
val (size, value) = extractValue(field, bitOffset, data, currentIndex, parentSegmentIds)
if (!field.isRedefined) {
bitOffset += field.binaryProperties.actualSize
if (field.redefines.isDefined) {
bitOffset += field.binaryProperties.actualSize
} else {
bitOffset += size
}
}
value
}
Expand All @@ -347,14 +352,14 @@ object RowExtractors {
})
}

new GenericRow(fields)
(bitOffset - offset, new GenericRow(fields))
}

var nextOffset = offsetBytes

val records = ast.children.collect { case grp: Group if grp.parentSegment.isEmpty =>
val values = getGroupValues(nextOffset, grp, segmentsData(0)._2, 0, segmentsData(0)._1 :: Nil)
nextOffset += grp.binaryProperties.actualSize
val (size, values) = getGroupValues(nextOffset, grp, segmentsData(0)._2, 0, segmentsData(0)._1 :: Nil)
nextOffset += size
values
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ class Test22HierarchicalOccursSpec extends FunSuite with SparkTestBase with Bina
.option("copybook_contents", copybook)
.option("pedantic", "true")
.option("is_record_sequence", "true")
.option("generate_record_id", "true")
.option("schema_retention_policy", "collapse_root")
.option("generate_record_id", "true")
.option("variable_size_occurs", "true")
.option("segment_field", "SEG")
.option("redefine_segment_id_map:1", "SEG1 => 1")
.option("redefine-segment-id-map:2", "SEG2 => 2")
Expand Down Expand Up @@ -172,7 +173,7 @@ class Test22HierarchicalOccursSpec extends FunSuite with SparkTestBase with Bina
| "FIELD1" : "E"
| }, {
| "FIELD1" : "F"
| }, { } ]
| } ]
| } ],
| "SEG2" : [ {
| "COUNT2" : 2,
Expand All @@ -189,16 +190,15 @@ class Test22HierarchicalOccursSpec extends FunSuite with SparkTestBase with Bina
| "FIELD2" : "I"
| }, {
| "FIELD2" : "J"
| }, { } ]
| } ]
| } ]
| } ]
| }
|} ]""".stripMargin.replace("\r\n", "\n")

val actual = SparkUtils.prettyJSON(df.toJSON.collect().mkString("[", ",", "]"))
//println(actual)

//assertEqualsMultiline(actual, expected)
assertEqualsMultiline(actual, expected)
}

}
Expand Down

0 comments on commit baf8fe4

Please sign in to comment.