diff --git a/pyzoo/zoo/examples/streaming/objectdetection/README.md b/pyzoo/zoo/examples/streaming/objectdetection/README.md index d7d18b4cc26..6a2a953d40d 100644 --- a/pyzoo/zoo/examples/streaming/objectdetection/README.md +++ b/pyzoo/zoo/examples/streaming/objectdetection/README.md @@ -15,6 +15,8 @@ So, there are two applications in this example: image_path_writer and streaming_ ## Run this example Make sure all nodes can access image files, model and text files. Local file system/HDFS/Amazon S3 are supported. +Pls ensure all paths exist and accessible, and `streaming_path` is empty. Note that `streaming_object_detection` and `image_path_writer` should use the same `streaming_path`. + 1. Start streaming_object_detection ``` MASTER=... diff --git a/pyzoo/zoo/examples/streaming/objectdetection/image_path_writer.py b/pyzoo/zoo/examples/streaming/objectdetection/image_path_writer.py index 2c8aa3de569..6e888f3dd9d 100644 --- a/pyzoo/zoo/examples/streaming/objectdetection/image_path_writer.py +++ b/pyzoo/zoo/examples/streaming/objectdetection/image_path_writer.py @@ -19,6 +19,7 @@ from time import sleep from os import listdir, rename, mkdir, remove from os.path import isfile, join +import shutil def package_path_to_text(streaming_path, file_path, batch=10, delay=3): @@ -49,8 +50,8 @@ def package_path_to_text(streaming_path, file_path, batch=10, delay=3): with open(join(tmpDir, str(index) + ".txt"), "w") as text_file: text_file.writelines(files[curr:last]) # Move to streaming location - rename(text_file.name, - batch_file_name) + shutil.move(text_file.name, + batch_file_name) print("Writing to " + batch_file_name) index += 1 curr = last diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/common/Utils.scala b/zoo/src/main/scala/com/intel/analytics/zoo/common/Utils.scala index 0416befb204..9a977eec959 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/common/Utils.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/common/Utils.scala @@ -18,12 +18,12 @@ package com.intel.analytics.zoo.common import java.io._ import java.nio.file.attribute.PosixFilePermissions +import java.nio.file.{Path => JPath} -import com.intel.analytics.bigdl.utils.File import org.apache.commons.io.filefilter.WildcardFileFilter import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import java.nio.file.{Path => JPath} +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} +import org.apache.hadoop.io.IOUtils import org.apache.log4j.Logger import scala.collection.mutable.ArrayBuffer @@ -94,7 +94,18 @@ private[zoo] object Utils { * @return Array[Byte] */ def readBytes(path: String): Array[Byte] = { - File.readBytes(path) + var fs: FileSystem = null + var in: FSDataInputStream = null + try { + fs = getFileSystem(path) + in = fs.open(new Path(path)) + val byteArrayOut = new ByteArrayOutputStream() + IOUtils.copyBytes(in, byteArrayOut, 1024, true) + byteArrayOut.toByteArray + } finally { + if (null != in) in.close() + if (null != fs) fs.close() + } } /** @@ -132,7 +143,7 @@ private[zoo] object Utils { * get from cache (may shared with our connections) * @return hadoop.fs.FileSystem */ - def getFileSystem(fileName: String, newInstance: Boolean = false): FileSystem = { + def getFileSystem(fileName: String, newInstance: Boolean = true): FileSystem = { if (newInstance) { FileSystem.newInstance(new Path(fileName).toUri, new Configuration()) } else { @@ -183,7 +194,16 @@ private[zoo] object Utils { * @param isOverwrite Overwrite exiting file or not */ def saveBytes(bytes: Array[Byte], fileName: String, isOverwrite: Boolean = false): Unit = { - File.saveBytes(bytes, fileName, isOverwrite) + var fs: FileSystem = null + var out: FSDataOutputStream = null + try { + fs = getFileSystem(fileName) + out = fs.create(new Path(fileName), isOverwrite) + IOUtils.copyBytes(new ByteArrayInputStream(bytes), out, 1024, true) + } finally { + if (null != out) out.close() + if (null != fs) fs.close() + } } def logUsageErrorAndThrowException(errMessage: String, cause: Throwable = null): Unit = { diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/examples/streaming/objectdetection/README.md b/zoo/src/main/scala/com/intel/analytics/zoo/examples/streaming/objectdetection/README.md index 5f8ee2fc42b..6d1c85b3141 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/examples/streaming/objectdetection/README.md +++ b/zoo/src/main/scala/com/intel/analytics/zoo/examples/streaming/objectdetection/README.md @@ -14,6 +14,8 @@ So, there are two applications in this example: ImagePathWriter and StreamingObj ## Run this example Make sure all nodes can access image files, model and text files. Local file system/HDFS/Amazon S3 are supported. +Pls ensure all paths exist and accessible, and `streamingPath` is empty. Note that `StreamingObjectDetection` and `ImagePathWriter` should use the same `streamingPath`. + 1. Start StreamingObjectDetection ``` MASTER=...