Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Invalid cross-device link in image_path_writer.py #1656

Merged
merged 5 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyzoo/zoo/examples/streaming/objectdetection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ So, there are two applications in this example: image_path_writer and streaming_
## Run this example
Make sure all nodes can access image files, model and text files. Local file system/HDFS/Amazon S3 are supported.

Pls ensure all paths exist and accessible, and `streaming_path` is empty. Note that `streaming_object_detection` and `image_path_writer` should use the same `streaming_path`.

1. Start streaming_object_detection
```
MASTER=...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from time import sleep
from os import listdir, rename, mkdir, remove
from os.path import isfile, join
import shutil


def package_path_to_text(streaming_path, file_path, batch=10, delay=3):
Expand Down Expand Up @@ -49,8 +50,8 @@ def package_path_to_text(streaming_path, file_path, batch=10, delay=3):
with open(join(tmpDir, str(index) + ".txt"), "w") as text_file:
text_file.writelines(files[curr:last])
# Move to streaming location
rename(text_file.name,
batch_file_name)
shutil.move(text_file.name,
batch_file_name)
print("Writing to " + batch_file_name)
index += 1
curr = last
Expand Down
32 changes: 26 additions & 6 deletions zoo/src/main/scala/com/intel/analytics/zoo/common/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package com.intel.analytics.zoo.common

import java.io._
import java.nio.file.attribute.PosixFilePermissions
import java.nio.file.{Path => JPath}

import com.intel.analytics.bigdl.utils.File
import org.apache.commons.io.filefilter.WildcardFileFilter
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import java.nio.file.{Path => JPath}
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import org.apache.log4j.Logger

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -94,7 +94,18 @@ private[zoo] object Utils {
* @return Array[Byte]
*/
def readBytes(path: String): Array[Byte] = {
File.readBytes(path)
var fs: FileSystem = null
var in: FSDataInputStream = null
try {
fs = getFileSystem(path)
in = fs.open(new Path(path))
val byteArrayOut = new ByteArrayOutputStream()
IOUtils.copyBytes(in, byteArrayOut, 1024, true)
byteArrayOut.toByteArray
} finally {
if (null != in) in.close()
if (null != fs) fs.close()
}
}

/**
Expand Down Expand Up @@ -132,7 +143,7 @@ private[zoo] object Utils {
* get from cache (may shared with our connections)
* @return hadoop.fs.FileSystem
*/
def getFileSystem(fileName: String, newInstance: Boolean = false): FileSystem = {
def getFileSystem(fileName: String, newInstance: Boolean = true): FileSystem = {
if (newInstance) {
FileSystem.newInstance(new Path(fileName).toUri, new Configuration())
} else {
Expand Down Expand Up @@ -183,7 +194,16 @@ private[zoo] object Utils {
* @param isOverwrite Overwrite exiting file or not
*/
def saveBytes(bytes: Array[Byte], fileName: String, isOverwrite: Boolean = false): Unit = {
File.saveBytes(bytes, fileName, isOverwrite)
var fs: FileSystem = null
var out: FSDataOutputStream = null
try {
fs = getFileSystem(fileName)
out = fs.create(new Path(fileName), isOverwrite)
IOUtils.copyBytes(new ByteArrayInputStream(bytes), out, 1024, true)
} finally {
if (null != out) out.close()
if (null != fs) fs.close()
}
}

def logUsageErrorAndThrowException(errMessage: String, cause: Throwable = null): Unit = {
Expand Down