Skip to content

Commit

Permalink
Updated batch process and storage to upload file
Browse files Browse the repository at this point in the history
  • Loading branch information
axsaucedo committed May 18, 2020
1 parent 6967a24 commit c43d250
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 6 deletions.
49 changes: 47 additions & 2 deletions python/seldon_core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
import requests
from queue import Queue
from threading import Thread
from seldon_core.storage import Storage
import os
import uuid

CHOICES_GATEWAY_TYPE = ["ambassador", "istio", "seldon"]
CHOICES_TRANSPORT = ["rest", "grpc"]
CHOICES_PAYLOAD_TYPE = ["ndarray", "tensor", "tftensor", "json", "bytes", "str", "raw"]
CHOICES_METHOD = ["predictions", "explain"]
CHOICES_LOG_LEVEL = ["debug", "info", "warning", "error"]

# Create uuid file
DATA_TEMP_DIRPATH = os.path.join(__file__, "TEMP_DATA_FILES")
DATA_TEMP_UUID = str(uuid.uuid4())
DATA_TEMP_INPUT_FILENAME = f"{DATA_TEMP_UUID}-input.txt"
DATA_TEMP_OUTPUT_FILENAME = f"{DATA_TEMP_UUID}-output.txt"


@click.command()
@click.option(
Expand Down Expand Up @@ -87,6 +96,39 @@ def run_cli(
method,
log_level,
):
is_remote_input_path = Storage.is_remote_path(input_data_path)
is_remote_output_path = Storage.is_remote_path(output_data_path)

local_output_data_path = None
if is_remote_output_path:
try:
os.mkdir(DATA_TEMP_DIRPATH)
except FileExistsError:
pass
local_output_data_path = os.path.join(
DATA_TEMP_DIRPATH, DATA_TEMP_OUTPUT_FILENAME
)
else:
# Check if file path is correct and is not directory by creating temp file
with open(output_data_path, "x") as tempfile:
pass
local_output_data_path = output_data_path

local_input_data_path = None
if is_remote_input_path:
local_input_data_path = os.path.join(
DATA_TEMP_DIRPATH, DATA_TEMP_INPUT_FILENAME
)
Storage.download(input_data_path, DATA_TEMP_INPUT_FILENAME)
if os.path.isdir(DATA_TEMP_INPUT_FILENAME):
raise RuntimeError(
"Only single files are supported - "
f"directory {input_data_path} is not valid. "
"Please provide a file, not a directory."
)
else:
local_input_data_path = input_data_path

# TODO: Add checks that url is valid
url = f"http://{host}/seldon/{namespace}/{deployment_name}/api/v1.0/{method}"
q_in = Queue(workers * 2)
Expand All @@ -105,7 +147,7 @@ def _start_request_worker():
q_in.task_done()

def _start_file_worker():
output_data_file = open(output_data_path, "w")
output_data_file = open(local_output_data_path, "w")
while True:
line = q_out.get()
output_data_file.write(f"{line}")
Expand All @@ -120,9 +162,12 @@ def _start_file_worker():
t.daemon = True
t.start()

input_data_file = open(input_data_path, "r")
input_data_file = open(local_input_data_path, "r")
for line in input_data_file:
q_in.put(line)

q_in.join()
q_out.join()

if is_remote_output_path:
Storage.upload(local_output_data_path, output_data_path)
22 changes: 18 additions & 4 deletions python/seldon_core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@


class Storage(object): # pylint: disable=too-few-public-methods
@staticmethod
def is_remote_path(path: str) -> bool:
return (
path.startswith(_GCS_PREFIX)
or path.startswith(_S3_PREFIX)
or re.search(_BLOB_RE, path)
)

@staticmethod
def upload(in_path: str, uri: str):
logging.info("Copying contents of local to %s", uri)

if uri.startswith(_GCS_PREFIX):
if uri.startswith(_S3_PREFIX):
Storage._upload_s3(in_path, uri)
else:
raise Exception(
Expand Down Expand Up @@ -119,26 +127,32 @@ def _upload_s3(in_path: str, uri: str):
f"with result {upload_result}"
)
except ResponseError as err:
logging.error("Failed uploading file {in_path} to {uri}", exc_info=True)
logging.error(
f"Failed uploading file {in_path} to {uri}", exc_info=True
)
else:
in_path_length = len(in_path)
files = glob.glob(os.path.join(in_path, "**"), recursive=True)
logging.info(f"Uploading directory {in_path} to {uri}")
for file in files:
if not os.path.isfile(file):
continue
bucket_suffix_path = file[in_path_length:]
bucket_joint_path = os.path.join(bucket_path, bucket_suffix_path)
if bucket_joint_path.startswith("/"):
bucket_joint_path = bucket_joint_path[1:]
logging.info(f"Uploading directory file {file} to {bucket_joint_path}")
try:
upload_result = client.fput_object(
bucket_name, bucket_joint_path, in_path
bucket_name, bucket_joint_path, file
)
logging.info(
f"Successfully uploaded file {file} to {bucket_joint_path} "
f"with result {upload_result}"
)
except ResponseError as err:
logging.error(
"Failed uploading file {file} to {bucket_joint_path}",
f"Failed uploading file {file} to {bucket_joint_path}",
exc_info=True,
)

Expand Down

0 comments on commit c43d250

Please sign in to comment.