Skip to content

Commit

Permalink
Fix invalid cross-device and filesystem closed issues in Streaming ex…
Browse files Browse the repository at this point in the history
…amples (intel-analytics#1656)

* Fix Invalid cross-device link in image_path_writer.py.
* Fix HDFS filesystem closed issue in ImagePathWriter and StreamingObjectDetection.
* Modify document.
  • Loading branch information
qiyuangong authored Oct 10, 2019
1 parent 60445ac commit 7665350
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package com.intel.analytics.zoo.common

import java.io._
import java.nio.file.attribute.PosixFilePermissions
import java.nio.file.{Path => JPath}

import com.intel.analytics.bigdl.utils.File
import org.apache.commons.io.filefilter.WildcardFileFilter
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import java.nio.file.{Path => JPath}
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import org.apache.log4j.Logger

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -94,7 +94,18 @@ private[zoo] object Utils {
* @return Array[Byte]
*/
def readBytes(path: String): Array[Byte] = {
File.readBytes(path)
var fs: FileSystem = null
var in: FSDataInputStream = null
try {
fs = getFileSystem(path)
in = fs.open(new Path(path))
val byteArrayOut = new ByteArrayOutputStream()
IOUtils.copyBytes(in, byteArrayOut, 1024, true)
byteArrayOut.toByteArray
} finally {
if (null != in) in.close()
if (null != fs) fs.close()
}
}

/**
Expand Down Expand Up @@ -132,7 +143,7 @@ private[zoo] object Utils {
* get from cache (may shared with our connections)
* @return hadoop.fs.FileSystem
*/
def getFileSystem(fileName: String, newInstance: Boolean = false): FileSystem = {
def getFileSystem(fileName: String, newInstance: Boolean = true): FileSystem = {
if (newInstance) {
FileSystem.newInstance(new Path(fileName).toUri, new Configuration())
} else {
Expand Down Expand Up @@ -183,7 +194,16 @@ private[zoo] object Utils {
* @param isOverwrite Overwrite exiting file or not
*/
def saveBytes(bytes: Array[Byte], fileName: String, isOverwrite: Boolean = false): Unit = {
File.saveBytes(bytes, fileName, isOverwrite)
var fs: FileSystem = null
var out: FSDataOutputStream = null
try {
fs = getFileSystem(fileName)
out = fs.create(new Path(fileName), isOverwrite)
IOUtils.copyBytes(new ByteArrayInputStream(bytes), out, 1024, true)
} finally {
if (null != out) out.close()
if (null != fs) fs.close()
}
}

def logUsageErrorAndThrowException(errMessage: String, cause: Throwable = null): Unit = {
Expand Down

0 comments on commit 7665350

Please sign in to comment.