From 76653506e5c1262082709cfba3dbacbe4ac1dd78 Mon Sep 17 00:00:00 2001 From: Qiyuan Gong Date: Thu, 10 Oct 2019 18:23:34 +0800 Subject: [PATCH] Fix invalid cross-device and filesystem closed issues in Streaming examples (#1656) * Fix Invalid cross-device link in image_path_writer.py. * Fix HDFS filesystem closed issue in ImagePathWriter and StreamingObjectDetection. * Modify document. --- Utils.scala | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/Utils.scala b/Utils.scala index 0416befb204..9a977eec959 100644 --- a/Utils.scala +++ b/Utils.scala @@ -18,12 +18,12 @@ package com.intel.analytics.zoo.common import java.io._ import java.nio.file.attribute.PosixFilePermissions +import java.nio.file.{Path => JPath} -import com.intel.analytics.bigdl.utils.File import org.apache.commons.io.filefilter.WildcardFileFilter import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import java.nio.file.{Path => JPath} +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} +import org.apache.hadoop.io.IOUtils import org.apache.log4j.Logger import scala.collection.mutable.ArrayBuffer @@ -94,7 +94,18 @@ private[zoo] object Utils { * @return Array[Byte] */ def readBytes(path: String): Array[Byte] = { - File.readBytes(path) + var fs: FileSystem = null + var in: FSDataInputStream = null + try { + fs = getFileSystem(path) + in = fs.open(new Path(path)) + val byteArrayOut = new ByteArrayOutputStream() + IOUtils.copyBytes(in, byteArrayOut, 1024, true) + byteArrayOut.toByteArray + } finally { + if (null != in) in.close() + if (null != fs) fs.close() + } } /** @@ -132,7 +143,7 @@ private[zoo] object Utils { * get from cache (may shared with our connections) * @return hadoop.fs.FileSystem */ - def getFileSystem(fileName: String, newInstance: Boolean = false): FileSystem = { + def getFileSystem(fileName: String, newInstance: Boolean = true): FileSystem = { if (newInstance) { FileSystem.newInstance(new Path(fileName).toUri, new Configuration()) } else { @@ -183,7 +194,16 @@ private[zoo] object Utils { * @param isOverwrite Overwrite exiting file or not */ def saveBytes(bytes: Array[Byte], fileName: String, isOverwrite: Boolean = false): Unit = { - File.saveBytes(bytes, fileName, isOverwrite) + var fs: FileSystem = null + var out: FSDataOutputStream = null + try { + fs = getFileSystem(fileName) + out = fs.create(new Path(fileName), isOverwrite) + IOUtils.copyBytes(new ByteArrayInputStream(bytes), out, 1024, true) + } finally { + if (null != out) out.close() + if (null != fs) fs.close() + } } def logUsageErrorAndThrowException(errMessage: String, cause: Throwable = null): Unit = {