Skip to content

Commit

Permalink
Merge pull request #714 from vinodkc/br_fix_race_condition
Browse files Browse the repository at this point in the history
Avoid a race condition on Fixed record-length file processing.
  • Loading branch information
yruslan authored Oct 7, 2024
2 parents b2578a4 + 1103944 commit 360ecff
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,17 @@ private[source] object CobolScanners extends Logging {
// binaryRecords() for fixed size records
// binaryFiles() for varying size records
// https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/SparkContext.html#binaryFiles(java.lang.String,%20int)

// Take a deep copy of Configuration for conf isolation
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
val recordSize = reader.getRecordSize

sourceDirs.foreach(sourceDir => {
if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, recordSize)) {
if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, conf, recordSize)) {
throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.")
}
})

val records = sourceDirs.map(sourceDir => sqlContext.sparkContext.binaryRecords(sourceDir, recordSize, sqlContext.sparkContext.hadoopConfiguration))
val records = sourceDirs.map(sourceDir => sqlContext.sparkContext.binaryRecords(sourceDir, recordSize, conf))
.reduce((a ,b) => a.union(b))
recordParser(reader, records)
}
Expand Down

0 comments on commit 360ecff

Please sign in to comment.