Skip to content

Commit

Permalink
#614 Add more unit tests to cover sparse index generation behavior.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed May 3, 2023
1 parent 981849d commit d58750a
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],

protected val cobolSchema: CobolSchema = loadCopyBook(copybookContents)

protected val recordHeaderParser: RecordHeaderParser = {
val recordHeaderParser: RecordHeaderParser = {
getRecordHeaderParser
}

checkInputArgumentsValidity()

protected def recordExtractor(startingRecordNumber: Long,
binaryData: SimpleStream,
copybook: Copybook
): Option[RawRecordExtractor] = {
def recordExtractor(startingRecordNumber: Long,
binaryData: SimpleStream,
copybook: Copybook
): Option[RawRecordExtractor] = {
val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment)

val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams)
Expand Down Expand Up @@ -143,7 +143,6 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
override def generateIndex(binaryData: SimpleStream,
fileNumber: Int,
isRdwBigEndian: Boolean): ArrayBuffer[SparseIndexEntry] = {
var recordSize = cobolSchema.getRecordSize
val inputSplitSizeRecords: Option[Int] = readerProperties.inputSplitRecords
val inputSplitSizeMB: Option[Int] = getSplitSizeMB

Expand Down Expand Up @@ -176,7 +175,6 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
case Some(field) => IndexGenerator.sparseIndexGenerator(fileNumber,
binaryData,
readerProperties.fileStartOffset,
isRdwBigEndian,
recordHeaderParser,
recordExtractor(0L, binaryData, copybook),
inputSplitSizeRecords,
Expand All @@ -188,7 +186,6 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
case None => IndexGenerator.sparseIndexGenerator(fileNumber,
binaryData,
readerProperties.fileStartOffset,
isRdwBigEndian,
recordHeaderParser,
recordExtractor(0L, binaryData, copybook),
inputSplitSizeRecords,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,34 @@ trait RawRecordExtractor extends Iterator[Array[Byte]] {
*
* IMPORTANT. The offset points to the next record to be fetched by .next(). If this invariant is not held,
* the reader might get inconsistent record ids, or can fail in certain circumstances.
*
* If you want to prefetch a record, use this pattern:
* {{{
* class ExampleRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
*
* private var currentOffset = ctx.inputStream.offset
*
* var record: Option[Array[Byte]] = fetchRecord()
*
* override def offset: Long = currentOffset
*
* override def hasNext: Boolean = record.isDefined
*
* override def next(): Array[Byte] = {
* if (record.isEmpty)
* throw new NoSuchElementException("next on empty iterator")
*
* currentOffset = ctx.inputStream.offset
* val result = record.get
* record = fetchRecord()
* result
* }
*
* def fetchRecord(): Option[Array[Byte]] = {
* // fetch the record from the stream if any
* }
* }
* }}}
*/
def offset: Long

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.mock

import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor}

class RecordExtractorReadAhaedMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
ctx.inputStream.next(2)

override def offset: Long = ctx.inputStream.offset

override def hasNext: Boolean = !ctx.inputStream.isEndOfStream

override def next(): Array[Byte] = {
val header = ctx.inputStream.next(2)
if (header.length == 2) {
ctx.inputStream.next(header.head + header(1) * 256)
} else {
Array.empty[Byte]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package za.co.absa.cobrix.cobol.mock

import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordMetadata}

class RecordHeadersParserMock(val isHeaderDefinedInCopybook: Boolean = false) extends RecordHeaderParser {
class RecordHeadersParserMock extends RecordHeaderParser {
var isHeaderDefinedInCopybook: Boolean = false

override def getHeaderLength: Int = 2

override def getRecordMetadata(header: Array[Byte], fileOffset: Long, maxOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = {
if (header.length == 2) {
RecordMetadata(header.head + header(1) * 256, isValid = true)
RecordMetadata((header.head + 256) % 256 + ((header(1) + 256) % 256)* 256, isValid = true)
} else {
RecordMetadata(0, isValid = false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package za.co.absa.cobrix.cobol.reader

import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.cobrix.cobol.parser.ast.Group
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordExtractors, RecordHandler}
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordExtractors


class RowExtractorSpec extends AnyFunSuite {
Expand Down Expand Up @@ -114,18 +112,8 @@ class RowExtractorSpec extends AnyFunSuite {
val copybook: Copybook = CopybookParser.parseTree(copyBookContents)
val startOffset: Int = 0


class Handler extends RecordHandler[scala.Array[Any]] {
override def create(values: Array[Any], group: Group): Array[Any] = values

override def toSeq(record: Array[Any]): Seq[Any] = Seq[Any]()

override def foreach(record: Array[Any])(f: Any => Unit): Unit = record.foreach(f)
}


test("Test row extractor") {
val row = RecordExtractors.extractRecord(copybook.ast, bytes, startOffset, handler = new Handler())
val row = RecordExtractors.extractRecord(copybook.ast, bytes, startOffset, handler = new SimpleRecordHandler())
// [[6,[EXAMPLE4,0,],[,,3,[Vector([000000000000002000400012,0,], [000000000000003000400102,1,], [000000005006001200301000,2,])]]]]

val innerRow = row.head.asInstanceOf[Array[Any]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.reader

import za.co.absa.cobrix.cobol.parser.ast.Group
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler

class SimpleRecordHandler extends RecordHandler[scala.Array[Any]] {
override def create(values: Array[Any], group: Group): Array[Any] = values

override def toSeq(record: Array[Any]): Seq[Any] = Seq[Any]()

override def foreach(record: Array[Any])(f: Any => Unit): Unit = record.foreach(f)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package za.co.absa.cobrix.cobol.reader

import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.cobrix.cobol.mock.{RecordExtractorMock, RecordExtractorReadAhaedMock, RecordHeadersParserMock}
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.cobol.parser.ast.Primitive
import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.encoding.ASCII
import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParserFactory
import za.co.absa.cobrix.cobol.reader.memorystream.TestStringStream
import za.co.absa.cobrix.cobol.reader.memorystream.{TestByteStream, TestStringStream}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, TextFullRecordExtractor, TextRecordExtractor}
import za.co.absa.cobrix.cobol.reader.index.IndexGenerator

Expand Down Expand Up @@ -63,7 +64,7 @@ class SparseIndexSpecSpec extends AnyWordSpec {

val recordExtractor = new TextRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, ""))

val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L,
recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue)
assert(indexes.length == 4)
Expand All @@ -84,7 +85,7 @@ class SparseIndexSpecSpec extends AnyWordSpec {

val recordExtractor = new TextFullRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, ""))

val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L,
recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue)
assert(indexes.length == 4)
Expand All @@ -97,6 +98,114 @@ class SparseIndexSpecSpec extends AnyWordSpec {
assert(indexes(3).offsetFrom == 90)
assert(indexes(3).offsetTo == -1)
}

"Generate a sparse index for a empty file and record extractor" in {
val stream = new TestByteStream(Array.empty[Byte])

val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, stream, copybook, null, null, ""))

val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L,
recordHeaderParser = null, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(1), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = None, isHierarchical = false)
assert(indexes.length == 1)
assert(indexes.head.offsetFrom == 0)
assert(indexes.head.offsetTo == -1)
assert(indexes.head.recordIndex == 0)
}

"Generate a sparse index for a data with Custom header parser" in {
val records = Range(0, 20).flatMap(_ => {
val record = new Array[Byte](61442)
record(0) = 0x00
record(1) = 0xF0.toByte
record
}).toArray
val stream = new TestByteStream(records)

val recordHeaderParser = new RecordHeadersParserMock

val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L,
recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = None, sizePerIndexEntryMB = Some(1),
copybook = Some(copybook), segmentField = None, isHierarchical = false)
assert(indexes.length == 2)
assert(indexes.head.offsetFrom == 0)
assert(indexes.head.offsetTo == 1105956)
assert(indexes.head.recordIndex == 0)
assert(indexes(1).offsetFrom == 1105956)
assert(indexes(1).offsetTo == -1)
assert(indexes(1).recordIndex == 18)
}

"Generate a sparse index for a data with Custom record parser" in {
val stream = new TestByteStream(Array(0x02, 0x00, 0xF1, 0xF2, // record 0
0x01, 0x00, 0xF3, // record 1
0x02, 0x00, 0xF4, 0xF5 // record 2
).map(_.toByte) )

val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, stream, copybook, null, null, ""))
recordExtractor.onReceiveAdditionalInfo("dummy")

val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L,
recordHeaderParser = null, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(1), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = None, isHierarchical = false)
assert(indexes.length == 2)
assert(indexes.head.offsetFrom == 0)
assert(indexes.head.offsetTo == 4)
assert(indexes.head.recordIndex == 0)
assert(indexes(1).offsetFrom == 4)
assert(indexes(1).offsetTo == -1)
assert(indexes(1).recordIndex == 1)
}

"Generate a sparse index for a data with Custom record parser with a file start offset" in {
val stream = new TestByteStream(Array(0xF0, 0xF0, // header to skip
0x02, 0x00, 0xF1, 0xF2, // record 0
0x01, 0x00, 0xF3, // record 1
0x02, 0x00, 0xF4, 0xF5, // record 2
0x02, 0x00, 0xF6, 0xF7, // record 3
0x01 // Invalid header
).map(_.toByte))

// Skip the first 2 bytes to the file offset
stream.next(2)

val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, stream, copybook, null, null, ""))


val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 2L,
recordHeaderParser = null, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(1), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = None, isHierarchical = false)
assert(indexes.length == 4)
assert(indexes.head.offsetFrom == 2)
assert(indexes.head.offsetTo == 6)
assert(indexes.head.recordIndex == 0)
assert(indexes(1).offsetFrom == 6)
assert(indexes(1).offsetTo == 9)
assert(indexes(1).recordIndex == 1)
assert(indexes(2).offsetFrom == 9)
assert(indexes(2).offsetTo == 13)
assert(indexes(2).recordIndex == 2)
assert(indexes(3).offsetFrom == 13)
assert(indexes(3).offsetTo == -1)
assert(indexes(3).recordIndex == 3)
}

"Throws an exception if the record extractor reads data in constructor" in {
val stream = new TestByteStream(Array(0xF0, 0xF0, // header to skip
0x02, 0x00, 0xF1, 0xF2 // record 0
).map(_.toByte))

val recordExtractor = new RecordExtractorReadAhaedMock(RawRecordContext(0L, stream, copybook, null, null, ""))


val ex = intercept[IllegalStateException] {
IndexGenerator.sparseIndexGenerator(0, stream, 0L,
recordHeaderParser = null, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(1), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = None, isHierarchical = false)
}
assert(ex.getMessage.contains("The record extractor has returned the offset that is not the beginning of the file."))
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ class VRLRecordReaderSpec extends AnyWordSpec {
}

"work for custom header parser" in {
val recordHeaderParser = new RecordHeadersParserMock
recordHeaderParser.isHeaderDefinedInCopybook = true
val reader = getUseCase(
records = customHeaderRecords,
recordHeaderParserOpt = Some(new RecordHeadersParserMock(true)))
recordHeaderParserOpt = Some(recordHeaderParser))

assert(reader.hasNext)
val (segment1, record1) = reader.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class Test5MultisegmentSpec extends AnyFunSuite with SparkTestBase {
val stream = new FileStreamer("../data/test5_data/COMP.DETAILS.SEP30.DATA.dat", FileSystem.get(new Configuration()))

val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0)
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L,
recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = Some(10), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue)
assert(indexes.length == 88)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Test12MultiRootSparseIndex extends AnyWordSpec with SparkTestBase with Bin
val stream = new FileStreamer(tmpFileName, FileSystem.get(new Configuration()))

val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwFixedLength, 3, 0, 0, 0)
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L,
recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = Some(4), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValues)
assert(indexes.length == 3)
Expand All @@ -81,7 +81,7 @@ class Test12MultiRootSparseIndex extends AnyWordSpec with SparkTestBase with Bin
val stream = new FileStreamer(tmpFileName, FileSystem.get(new Configuration()))

val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwFixedLength, 3, 0, 0, 0)
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, isRdwBigEndian = false,
val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L,
recordHeaderParser = recordHeaderParser, recordExtractor = None, recordsPerIndexEntry = Some(4), sizePerIndexEntryMB = None,
copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValues)
assert(indexes.length == 3)
Expand Down

0 comments on commit d58750a

Please sign in to comment.