diff --git a/README.md b/README.md index 51601195..94e12a81 100644 --- a/README.md +++ b/README.md @@ -1540,7 +1540,7 @@ A: Update hadoop dll to version 3.2.2 or newer. - [#545](https://github.com/AbsaOSS/cobrix/issues/545) Added support for `string` debug columns for ASCII (D/D2/T) files (`.option("debug", "string")`). - [#543](https://github.com/AbsaOSS/cobrix/issues/543) Improved performance of processing ASCII text (D/D2/T) files with variable OCCURS. - [#553](https://github.com/AbsaOSS/cobrix/issues/553) Fixed variable occurs now working properly with basic ASCII record format (D2). - + - [#556](https://github.com/AbsaOSS/cobrix/issues/556) Fixed `file_end_offset` option dropping records from the end of partitions instead of end of files. - #### 2.6.1 released 2 December 2022. - [#531](https://github.com/AbsaOSS/cobrix/issues/531) Added support for CP1047 EBCDIC code page. diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParser.scala index 94cb3fab..d7555c98 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParser.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParser.scala @@ -52,10 +52,11 @@ trait RecordHeaderParser { * @param header A record header as an array of bytes * @param fileOffset An offset from the beginning of the underlying file * @param fileSize A size of the underlying file + * @param maxOffset A maximum offset allowed to read by the current index chunk * @param recordNum A sequential record number * @return A parsed record metadata */ - def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata + def getRecordMetadata(header: Array[Byte], fileOffset: Long, maxOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata /** * Clients of 'spark-cobol' can pass additional information to custom record header parsers using diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserFixedLen.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserFixedLen.scala index 901d5c7d..53d15f31 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserFixedLen.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserFixedLen.scala @@ -37,12 +37,12 @@ class RecordHeaderParserFixedLen(recordSize: Int, * @param recordNum A sequential record number * @return A parsed record metadata */ - override def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = { + override def getRecordMetadata(header: Array[Byte], fileOffset: Long, maxOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = { if (fileHeaderBytes > 0 && fileOffset == 0L) { RecordMetadata(fileHeaderBytes, isValid = false) } else if (fileSize > 0L && fileFooterBytes > 0 && fileSize - fileOffset <= fileFooterBytes) { RecordMetadata((fileSize - fileOffset).toInt, isValid = false) - } else if (fileSize - fileOffset >= recordSize) { + } else if (maxOffset - fileOffset >= recordSize) { RecordMetadata(recordSize, isValid = true) } else { RecordMetadata(-1, isValid = false) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserRDW.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserRDW.scala index 41de1223..7a477e48 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserRDW.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserRDW.scala @@ -38,10 +38,11 @@ class RecordHeaderParserRDW(isBigEndian: Boolean, * @param header A record header as an array of bytes * @param fileOffset An offset from the beginning of the underlying file * @param fileSize A size of the underlying file + * @param maxOffset A maximum offset allowed to read by the current index chunk * @param recordNum A sequential record number * @return A parsed record metadata */ - override def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = { + override def getRecordMetadata(header: Array[Byte], fileOffset: Long, maxOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = { if (fileHeaderBytes > getHeaderLength && fileOffset == getHeaderLength) { RecordMetadata(fileHeaderBytes - getHeaderLength, isValid = false) } else if (fileSize > 0L && fileFooterBytes > 0 && fileSize - fileOffset <= fileFooterBytes) { diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala index 94265dcd..00980f95 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala @@ -75,7 +75,7 @@ object IndexGenerator extends Logging { case None => val headerSize = recordHeaderParser.getHeaderLength val headerBytes = dataStream.next(headerSize) - val recordMetadata = recordHeaderParser.getRecordMetadata(headerBytes, dataStream.offset, dataStream.size, recordIndex) + val recordMetadata = recordHeaderParser.getRecordMetadata(headerBytes, dataStream.offset, dataStream.size, dataStream.totalSize, recordIndex) if (recordMetadata.recordLength > 0) { record = dataStream.next(recordMetadata.recordLength) } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala index ebc0c44a..785823a8 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala @@ -157,7 +157,7 @@ class VRLRecordReader(cobolSchema: Copybook, while (!isValidRecord && !isEndOfFile) { headerBytes = dataStream.next(rdwHeaderBlock) - val recordMetadata = recordHeaderParser.getRecordMetadata(headerBytes, dataStream.offset, dataStream.size, recordIndex) + val recordMetadata = recordHeaderParser.getRecordMetadata(headerBytes, dataStream.offset, dataStream.size, dataStream.totalSize, recordIndex) val recordLength = recordMetadata.recordLength byteIndex += headerBytes.length diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala index 5476feb5..8767ac62 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala @@ -27,6 +27,8 @@ class FSStream (fileName: String) extends SimpleStream { override def size: Long = fileSize + override def totalSize: Long = fileSize + override def offset: Long = byteIndex override def inputFileName: String = fileName diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala index 0f830624..b4dbcd24 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala @@ -21,6 +21,8 @@ package za.co.absa.cobrix.cobol.reader.stream trait SimpleStream { def size: Long + def totalSize: Long + def offset: Long def inputFileName: String diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala index ef3ebca2..034e8cae 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala @@ -27,6 +27,8 @@ class TestByteStream(bytes: Array[Byte]) extends SimpleStream{ override def size: Long = sz + override def totalSize: Long = sz + override def offset: Long = position override def next(numberOfBytes: Int): Array[Byte] = { diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala index 71d3ed02..2b093549 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala @@ -27,6 +27,8 @@ class TestStringStream(str: String) extends SimpleStream{ override def size: Long = sz + override def totalSize: Long = sz + override def offset: Long = position override def next(numberOfBytes: Int): Array[Byte] = { diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala index a4a3aa5b..c84effb3 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala @@ -49,6 +49,8 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = override def size: Long = if (maximumBytes > 0) Math.min(fileSize, maximumBytes + startOffset) else fileSize + override def totalSize: Long = fileSize + override def offset: Long = byteIndex /** diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala index 75e3b179..b8454389 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala @@ -16,11 +16,12 @@ package za.co.absa.cobrix.spark.cobol.source.fixtures +import org.apache.commons.io.FileUtils + +import java.io.File.createTempFile import java.io.{DataOutputStream, File, FileOutputStream} import java.nio.charset.Charset -import java.nio.file.{Files, Path} - -import org.apache.commons.io.{FileSystemUtils, FileUtils} +import java.nio.file.Files /** * This fixture adds ability for a unit test to create temporary files for using them in the tests. @@ -39,10 +40,7 @@ trait BinaryFileFixture { * @return The full path to the temporary file */ def withTempTextFile(prefix: String, suffix: String, charset: Charset, content: String)(f: String => Unit): Unit = { - val tempFile = File.createTempFile(prefix, suffix) - val ostream = new DataOutputStream(new FileOutputStream(tempFile)) - ostream.write(content.getBytes(charset)) - ostream.close() + val tempFile = createTempTextFile(None, prefix, suffix, charset, content) f(tempFile.getAbsolutePath) @@ -103,6 +101,18 @@ trait BinaryFileFixture { tempFile.delete } + def createTempTextFile(dirOpt: Option[File], prefix: String, suffix: String, charset: Charset, content: String): File = { + val tempFile = dirOpt match { + case Some(dir) => createTempFile(prefix, suffix, dir) + case None => createTempFile(prefix, suffix) + } + + val ostream = new DataOutputStream(new FileOutputStream(tempFile)) + ostream.write(content.getBytes(charset)) + ostream.close() + tempFile + } + private def hex2bytes(hex: String): Array[Byte] = { val compactStr = hex.replaceAll("\\s", "") compactStr.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala index 1b5ada9b..c4cb1d61 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala @@ -21,6 +21,10 @@ import org.apache.spark.sql.functions.col import org.scalatest.wordspec.AnyWordSpec import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture +import za.co.absa.cobrix.spark.cobol.utils.SparkUtils + +import java.io.File +import java.nio.charset.StandardCharsets //noinspection NameBooleanParameters class Test28MultipartLoadSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture { @@ -87,6 +91,106 @@ class Test28MultipartLoadSpec extends AnyWordSpec with SparkTestBase with Binary } } } + + "file offsets supported when multiple multi-partition files are read" in { + val expected = """[ { + | "A" : "10" + |}, { + | "A" : "11" + |}, { + | "A" : "12" + |}, { + | "A" : "13" + |}, { + | "A" : "14" + |}, { + | "A" : "15" + |}, { + | "A" : "16" + |}, { + | "A" : "17" + |}, { + | "A" : "18" + |}, { + | "A" : "19" + |}, { + | "A" : "20" + |}, { + | "A" : "21" + |}, { + | "A" : "22" + |}, { + | "A" : "23" + |}, { + | "A" : "24" + |}, { + | "A" : "25" + |}, { + | "A" : "26" + |}, { + | "A" : "27" + |}, { + | "A" : "28" + |}, { + | "A" : "29" + |}, { + | "A" : "30" + |}, { + | "A" : "31" + |}, { + | "A" : "32" + |}, { + | "A" : "33" + |}, { + | "A" : "34" + |}, { + | "A" : "35" + |}, { + | "A" : "36" + |}, { + | "A" : "37" + |}, { + | "A" : "38" + |}, { + | "A" : "39" + |}, { + | "A" : "BB" + |}, { + | "A" : "FF" + |} ]""".stripMargin.replaceAll("[\\r\\n]", "\n") + + val data1 = "AA10111213141516171819" + "B" * 960 + "20212223242526272829CC" + val data2 = "EE30313233343536373839" + "F" * 960 + "30313233343536373839GG" + + withTempDirectory("rec_len_multi") { tempDir => + val parentDirOpt = Option(new File(tempDir)) + createTempTextFile(parentDirOpt, "file1", ".txt", StandardCharsets.UTF_8, data1) + createTempTextFile(parentDirOpt, "file2", ".txt", StandardCharsets.UTF_8, data2) + + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ascii") + .option("input_split_records", 10) + .option("file_start_offset", "2") + .option("file_end_offset", "2") + .load(tempDir) + + + val count = df.count() + + val actual = SparkUtils.prettyJSON(df + .distinct() + .orderBy(col("A")) + .toJSON + .collect() + .mkString("[", ",", "]")) + + assert(count == 1000) + assert(actual == expected) + } + } } private def getDataFrame(inputPaths: Seq[String], diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/utils/Test10CustomRDWParser.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/utils/Test10CustomRDWParser.scala index 508e0753..de6cdb42 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/utils/Test10CustomRDWParser.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/utils/Test10CustomRDWParser.scala @@ -34,11 +34,12 @@ class Test10CustomRDWParser extends Serializable with RecordHeaderParser { * * @param header A record header as an array of bytes * @param fileOffset An offset from the beginning of the underlying file + * @param maxOffset A maximum offset allowed to read by the current index chunk * @param fileSize A size of the underlying file * @param recordNum A sequential record number * @return A parsed record metadata */ - override def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = { + override def getRecordMetadata(header: Array[Byte], fileOffset: Long, maxOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = { val rdwHeaderBlock = getHeaderLength if (header.length < rdwHeaderBlock) { RecordMetadata(-1, isValid = false)