Skip to content

Commit

Permalink
#556 Fix the option 'file_end_offset' truncates records of each parti…
Browse files Browse the repository at this point in the history
…tion instead of end of each file.
  • Loading branch information
yruslan committed Jan 3, 2023
1 parent 76c4c86 commit e0596e2
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,7 @@ A: Update hadoop dll to version 3.2.2 or newer.
- [#545](https://github.com/AbsaOSS/cobrix/issues/545) Added support for `string` debug columns for ASCII (D/D2/T) files (`.option("debug", "string")`).
- [#543](https://github.com/AbsaOSS/cobrix/issues/543) Improved performance of processing ASCII text (D/D2/T) files with variable OCCURS.
- [#553](https://github.com/AbsaOSS/cobrix/issues/553) Fixed variable occurs now working properly with basic ASCII record format (D2).

- [#556](https://github.com/AbsaOSS/cobrix/issues/556) Fixed `file_end_offset` option dropping records from the end of partitions instead of end of files.

- #### 2.6.1 released 2 December 2022.
- [#531](https://github.com/AbsaOSS/cobrix/issues/531) Added support for CP1047 EBCDIC code page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ trait RecordHeaderParser {
* @param header A record header as an array of bytes
* @param fileOffset An offset from the beginning of the underlying file
* @param fileSize A size of the underlying file
* @param maxOffset A maximum offset allowed to read by the current index chunk
* @param recordNum A sequential record number
* @return A parsed record metadata
*/
def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata
def getRecordMetadata(header: Array[Byte], fileOffset: Long, maxOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata

/**
* Clients of 'spark-cobol' can pass additional information to custom record header parsers using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ class RecordHeaderParserFixedLen(recordSize: Int,
* @param recordNum A sequential record number
* @return A parsed record metadata
*/
override def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = {
override def getRecordMetadata(header: Array[Byte], fileOffset: Long, maxOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = {
if (fileHeaderBytes > 0 && fileOffset == 0L) {
RecordMetadata(fileHeaderBytes, isValid = false)
} else if (fileSize > 0L && fileFooterBytes > 0 && fileSize - fileOffset <= fileFooterBytes) {
RecordMetadata((fileSize - fileOffset).toInt, isValid = false)
} else if (fileSize - fileOffset >= recordSize) {
} else if (maxOffset - fileOffset >= recordSize) {
RecordMetadata(recordSize, isValid = true)
} else {
RecordMetadata(-1, isValid = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ class RecordHeaderParserRDW(isBigEndian: Boolean,
* @param header A record header as an array of bytes
* @param fileOffset An offset from the beginning of the underlying file
* @param fileSize A size of the underlying file
* @param maxOffset A maximum offset allowed to read by the current index chunk
* @param recordNum A sequential record number
* @return A parsed record metadata
*/
override def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = {
override def getRecordMetadata(header: Array[Byte], fileOffset: Long, maxOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = {
if (fileHeaderBytes > getHeaderLength && fileOffset == getHeaderLength) {
RecordMetadata(fileHeaderBytes - getHeaderLength, isValid = false)
} else if (fileSize > 0L && fileFooterBytes > 0 && fileSize - fileOffset <= fileFooterBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object IndexGenerator extends Logging {
case None =>
val headerSize = recordHeaderParser.getHeaderLength
val headerBytes = dataStream.next(headerSize)
val recordMetadata = recordHeaderParser.getRecordMetadata(headerBytes, dataStream.offset, dataStream.size, recordIndex)
val recordMetadata = recordHeaderParser.getRecordMetadata(headerBytes, dataStream.offset, dataStream.size, dataStream.totalSize, recordIndex)
if (recordMetadata.recordLength > 0) {
record = dataStream.next(recordMetadata.recordLength)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class VRLRecordReader(cobolSchema: Copybook,
while (!isValidRecord && !isEndOfFile) {
headerBytes = dataStream.next(rdwHeaderBlock)

val recordMetadata = recordHeaderParser.getRecordMetadata(headerBytes, dataStream.offset, dataStream.size, recordIndex)
val recordMetadata = recordHeaderParser.getRecordMetadata(headerBytes, dataStream.offset, dataStream.size, dataStream.totalSize, recordIndex)
val recordLength = recordMetadata.recordLength

byteIndex += headerBytes.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class FSStream (fileName: String) extends SimpleStream {

override def size: Long = fileSize

override def totalSize: Long = fileSize

override def offset: Long = byteIndex

override def inputFileName: String = fileName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package za.co.absa.cobrix.cobol.reader.stream
trait SimpleStream {
def size: Long

def totalSize: Long

def offset: Long

def inputFileName: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class TestByteStream(bytes: Array[Byte]) extends SimpleStream{

override def size: Long = sz

override def totalSize: Long = sz

override def offset: Long = position

override def next(numberOfBytes: Int): Array[Byte] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class TestStringStream(str: String) extends SimpleStream{

override def size: Long = sz

override def totalSize: Long = sz

override def offset: Long = position

override def next(numberOfBytes: Int): Array[Byte] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =

override def size: Long = if (maximumBytes > 0) Math.min(fileSize, maximumBytes + startOffset) else fileSize

override def totalSize: Long = fileSize

override def offset: Long = byteIndex

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

package za.co.absa.cobrix.spark.cobol.source.fixtures

import org.apache.commons.io.FileUtils

import java.io.File.createTempFile
import java.io.{DataOutputStream, File, FileOutputStream}
import java.nio.charset.Charset
import java.nio.file.{Files, Path}

import org.apache.commons.io.{FileSystemUtils, FileUtils}
import java.nio.file.Files

/**
* This fixture adds ability for a unit test to create temporary files for using them in the tests.
Expand All @@ -39,10 +40,7 @@ trait BinaryFileFixture {
* @return The full path to the temporary file
*/
def withTempTextFile(prefix: String, suffix: String, charset: Charset, content: String)(f: String => Unit): Unit = {
val tempFile = File.createTempFile(prefix, suffix)
val ostream = new DataOutputStream(new FileOutputStream(tempFile))
ostream.write(content.getBytes(charset))
ostream.close()
val tempFile = createTempTextFile(None, prefix, suffix, charset, content)

f(tempFile.getAbsolutePath)

Expand Down Expand Up @@ -103,6 +101,18 @@ trait BinaryFileFixture {
tempFile.delete
}

def createTempTextFile(dirOpt: Option[File], prefix: String, suffix: String, charset: Charset, content: String): File = {
val tempFile = dirOpt match {
case Some(dir) => createTempFile(prefix, suffix, dir)
case None => createTempFile(prefix, suffix)
}

val ostream = new DataOutputStream(new FileOutputStream(tempFile))
ostream.write(content.getBytes(charset))
ostream.close()
tempFile
}

private def hex2bytes(hex: String): Array[Byte] = {
val compactStr = hex.replaceAll("\\s", "")
compactStr.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import org.apache.spark.sql.functions.col
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
import za.co.absa.cobrix.spark.cobol.utils.SparkUtils

import java.io.File
import java.nio.charset.StandardCharsets

//noinspection NameBooleanParameters
class Test28MultipartLoadSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture {
Expand Down Expand Up @@ -87,6 +91,106 @@ class Test28MultipartLoadSpec extends AnyWordSpec with SparkTestBase with Binary
}
}
}

"file offsets supported when multiple multi-partition files are read" in {
val expected = """[ {
| "A" : "10"
|}, {
| "A" : "11"
|}, {
| "A" : "12"
|}, {
| "A" : "13"
|}, {
| "A" : "14"
|}, {
| "A" : "15"
|}, {
| "A" : "16"
|}, {
| "A" : "17"
|}, {
| "A" : "18"
|}, {
| "A" : "19"
|}, {
| "A" : "20"
|}, {
| "A" : "21"
|}, {
| "A" : "22"
|}, {
| "A" : "23"
|}, {
| "A" : "24"
|}, {
| "A" : "25"
|}, {
| "A" : "26"
|}, {
| "A" : "27"
|}, {
| "A" : "28"
|}, {
| "A" : "29"
|}, {
| "A" : "30"
|}, {
| "A" : "31"
|}, {
| "A" : "32"
|}, {
| "A" : "33"
|}, {
| "A" : "34"
|}, {
| "A" : "35"
|}, {
| "A" : "36"
|}, {
| "A" : "37"
|}, {
| "A" : "38"
|}, {
| "A" : "39"
|}, {
| "A" : "BB"
|}, {
| "A" : "FF"
|} ]""".stripMargin.replaceAll("[\\r\\n]", "\n")

val data1 = "AA10111213141516171819" + "B" * 960 + "20212223242526272829CC"
val data2 = "EE30313233343536373839" + "F" * 960 + "30313233343536373839GG"

withTempDirectory("rec_len_multi") { tempDir =>
val parentDirOpt = Option(new File(tempDir))
createTempTextFile(parentDirOpt, "file1", ".txt", StandardCharsets.UTF_8, data1)
createTempTextFile(parentDirOpt, "file2", ".txt", StandardCharsets.UTF_8, data2)

val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("encoding", "ascii")
.option("input_split_records", 10)
.option("file_start_offset", "2")
.option("file_end_offset", "2")
.load(tempDir)


val count = df.count()

val actual = SparkUtils.prettyJSON(df
.distinct()
.orderBy(col("A"))
.toJSON
.collect()
.mkString("[", ",", "]"))

assert(count == 1000)
assert(actual == expected)
}
}
}

private def getDataFrame(inputPaths: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ class Test10CustomRDWParser extends Serializable with RecordHeaderParser {
*
* @param header A record header as an array of bytes
* @param fileOffset An offset from the beginning of the underlying file
* @param maxOffset A maximum offset allowed to read by the current index chunk
* @param fileSize A size of the underlying file
* @param recordNum A sequential record number
* @return A parsed record metadata
*/
override def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = {
override def getRecordMetadata(header: Array[Byte], fileOffset: Long, maxOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = {
val rdwHeaderBlock = getHeaderLength
if (header.length < rdwHeaderBlock) {
RecordMetadata(-1, isValid = false)
Expand Down

0 comments on commit e0596e2

Please sign in to comment.