Skip to content

Commit

Permalink
Merge pull request #26 from akuporos/mk/poc_async
Browse files Browse the repository at this point in the history
[IE PYTHON API POC] Add fixed InferQueue and modification in async part of benchmark
  • Loading branch information
akuporos committed May 13, 2021
2 parents c57cf54 + 2ada0fc commit 847b98b
Show file tree
Hide file tree
Showing 22 changed files with 908 additions and 189 deletions.
96 changes: 96 additions & 0 deletions ngraph/python/examples/async_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
First example.
User wants to use OpenVINO API to infer one picture using
synchronous Infer Request.
"""

import numpy as np
import time

from openvino.inference_engine import IECore
from openvino.inference_engine import TensorDesc
from openvino.inference_engine import Blob
from openvino.inference_engine import StatusCode

from helpers import get_images


def get_reference(executable_network, image):
"""Get reference outputs using synchronous API."""
return executable_network.infer({'data': img})


# Read images from a folder
images = get_images()

# Read and Load of network
ie = IECore()
ie_network = ie.read_network(
'/home/jiwaszki/testdata/models/test_model/test_model_fp32.xml',
'/home/jiwaszki/testdata/models/test_model/test_model_fp32.bin')
executable_network = ie.load_network(network=ie_network,
device_name='CPU',
config={})

img = images[0]
ref_result = get_reference(executable_network, img)

# Create InferRequest
request = executable_network.create_infer_request()

num_of_runs = 1

times = []


# Create callback function
def say_hi(request, userdata):
"""User-defined callback function."""
print("Hi this is your Infer Request, I'm done!", userdata)
print(request.output_blobs['fc_out'].buffer.copy())
global times
times += [(time.time() - userdata) * 1000]


# Set callback on our request
# Async infer
print('Starting async infer...')

request.infer()

for i in range(num_of_runs):
start_time = time.time()
request.set_completion_callback(say_hi, start_time)
request.async_infer({'data': img})
status = request.wait()

print(times)
latency_median = np.median(np.array(times))
print(latency_median)

# print('I can do something here!')
# # # Do some work
# # j = 0
# # for i in range(1000000):
# # j = i
# # print("j =", j)
# time.sleep(3)

# Wait for Infer Request to finish
status = request.wait()

# print(request.get_perf_counts())

if status == StatusCode.OK:
print('Finished asynchronous infer!')
for key in executable_network.output_info:
print(request.get_blob(key).buffer)
print(ref_result[key])
assert np.allclose(request.get_blob(key).buffer,
ref_result[key])
else:
raise RuntimeError('Infer Request failed to finish!')

# TODO: When callback is present everything works
# del request
44 changes: 44 additions & 0 deletions ngraph/python/examples/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Use case: user wants to run the inference on set of pictures
# and store the results of the inference (e.g. in a database)
# The Inference Queue allows him to run inference in parallel jobs.

import numpy as np
import os
import time
from openvino.inference_engine import IECore
from openvino.inference_engine import TensorDesc
from openvino.inference_engine import Blob
from openvino.inference_engine import StatusCode
from openvino.inference_engine import InferQueue


def image_path(name):
path_to_repo = os.environ["DATA_PATH"]
path_to_img = os.path.join(path_to_repo, 'validation_set', '224x224', name)
return path_to_img


def read_image(name):
import cv2
n, c, h, w = (1, 3, 32, 32)
image = cv2.imread(image_path(name))
if image is None:
raise FileNotFoundError('Input image not found')

image = cv2.resize(image, (h, w)) / 255
image = image.transpose((2, 0, 1)).astype(np.float32)
image = image.reshape((n, c, h, w))
return image


def get_images():
images = []

images += [read_image('dog.bmp')]
images += [read_image('cat1.bmp')]
images += [read_image('cat2.bmp')]
images += [read_image('dog1.bmp')]
images += [read_image('dog2.bmp')]
images += [read_image('dog3.bmp')]

return images
131 changes: 131 additions & 0 deletions ngraph/python/examples/queue_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
Third example.
User wants to run the inference on set of pictures and store the
results of the inference (e.g. in a database)
The Inference Queue allows him to run inference as parallel jobs.
"""

import numpy as np
import pandas as pd
import time
import sqlalchemy as db

from openvino.inference_engine import IECore
# from openvino.inference_engine import TensorDesc
# from openvino.inference_engine import Blob
from openvino.inference_engine import StatusCode
from openvino.inference_engine import InferQueue

from helpers import get_images


def get_reference(executable_network, images):
"""Get reference outputs using synchronous API."""
return [executable_network.infer({'data': img}) for img in images]


def show_table(tab):
results = connection.execute(db.select([tab])).fetchall()
if len(results) != 0:
df = pd.DataFrame(results)
df.columns = results[0].keys()
print(df.head(6))


engine = db.create_engine('sqlite:///queue.sqlite')
connection = engine.connect()
metadata = db.MetaData()

emp = db.Table('emp', metadata,
db.Column('Id', db.Integer()),
db.Column('pred_class', db.Integer()))

metadata.create_all(engine) # Creates the table

# Read images from a folder
images = get_images()

# Read and Load of network
ie = IECore()
ie_network = ie.read_network(
'/home/jiwaszki/testdata/models/test_model/test_model_fp32.xml',
'/home/jiwaszki/testdata/models/test_model/test_model_fp32.bin')
executable_network = ie.load_network(network=ie_network,
device_name='CPU',
config={})


# Create InferQueue with specific number of jobs/InferRequests
infer_queue = InferQueue(network=executable_network, jobs=3)

executable_network.infer()

ref_results = []

ref_start_time = time.time()
for i in range(len(images)):
res = executable_network.infer({'data': images[i]})
pred_class = np.argmax(res['fc_out'])
ref_results += [(i, pred_class)]
ref_end_time = time.time()
ref_time = (ref_end_time - ref_start_time) * 1000


def get_results(request, userdata):
"""User-defined callback function."""
end_time = time.time()
print('Finished picture', userdata)
pred_class = np.argmax(request.get_result('fc_out'))

# tmp_res[userdata['index']] = [(userdata['index'], pred_class)]

# global tmp_res
# tmp_res += [(userdata['index'], pred_class)]

engine = db.create_engine('sqlite:///queue.sqlite')
connection = engine.connect()
query = db.insert(emp).values(Id=userdata['index'],
pred_class=int(pred_class))
connection.execute(query)

del engine
del connection

times[userdata['index']] = (end_time - userdata['start_time']) * 1000


# tmp_res = [0] * len(images)
# tmp_res = []
# Set callbacks on each job/InferRequest
times = np.zeros((len(images)))
infer_queue.set_infer_callback(get_results)

print('Starting InferQueue...')
start_queue_time = time.time()
for i in range(len(images)):
start_request_time = time.time()
infer_queue.async_infer(inputs={'data': images[i]},
userdata={'index': i,
'start_time': start_request_time})
print('Started picture ', i)

# Wait for all jobs/InferRequests to finish!
statuses = infer_queue.wait_all()
end_queue_time = time.time()
queue_time = (end_queue_time - start_queue_time) * 1000

if np.all(np.array(statuses) == StatusCode.OK):
print('Reference execution time:', ref_time)
print('Finished InferQueue! Execution time:', queue_time)
print('Times for each image: ', times)
show_table(emp)
results = connection.execute(db.select([emp])).fetchall()
metadata.drop_all(engine) # drops all the tables in the database
reference = get_reference(executable_network, images)
assert len(results) == len(images)
for i in range(len(results)):
assert ref_results[i] == results[i]
else:
metadata.drop_all(engine) # drops all the tables in the database
raise RuntimeError('InferQueue failed to finish!')
67 changes: 67 additions & 0 deletions ngraph/python/examples/sync_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
First example.
User wants to use OpenVINO API to infer one picture using
synchronous Infer Request.
"""

import numpy as np

from openvino.inference_engine import IECore
from openvino.inference_engine import TensorDesc
from openvino.inference_engine import Blob

from helpers import get_images

# Read images from a folder
img = get_images()[0]

# Read and Load of network
ie = IECore()
ie_network = ie.read_network(
'/home/jiwaszki/testdata/models/test_model/test_model_fp32.xml',
'/home/jiwaszki/testdata/models/test_model/test_model_fp32.bin')
executable_network = ie.load_network(network=ie_network,
device_name='CPU',
config={})

# Infer directly
result_executable_network = executable_network.infer({'data': img})

# Prepare request
request = executable_network.create_infer_request()

# Create blob
tensor_desc = TensorDesc('FP32',
[1, 3, img.shape[2], img.shape[3]],
'NCHW')
img_blob = Blob(tensor_desc, img)

# Set it by using dedicated function
request.set_blob('data', img_blob)
assert np.allclose(img, request.get_blob('data').buffer)
# Or more versitile function
request.set_input({'data': img_blob})
assert np.allclose(img, request.get_blob('data').buffer)
request.set_input({'data': np.ascontiguousarray(img)})
assert np.allclose(img, request.get_blob('data').buffer)
# And infer
result_infer_request = request.infer()

# Or just use it directly
result_infer_request = request.infer({'data': img})

for key in executable_network.output_info:
print(result_infer_request[key])
print(result_executable_network[key])
assert np.allclose(result_infer_request[key],
result_executable_network[key])


# TODO: When callback if present everything works
# def pass_func(request, userdata):
# pass


# request.set_completion_callback(pass_func, {})
del request
13 changes: 13 additions & 0 deletions ngraph/python/src/openvino/inference_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from openvino.pyopenvino import get_version
from openvino.pyopenvino import StatusCode
from openvino.pyopenvino import InferQueue
from openvino.pyopenvino import InferRequest # TODO: move to ie_api?
from openvino.pyopenvino import Blob
from openvino.pyopenvino import PreProcessInfo
from openvino.pyopenvino import MeanVariant
Expand All @@ -21,5 +22,17 @@
from openvino.pyopenvino import PreProcessChannel

from openvino.inference_engine.ie_api import BlobWrapper
from openvino.inference_engine.ie_api import infer
from openvino.inference_engine.ie_api import async_infer
from openvino.inference_engine.ie_api import get_result

# Patching for Blob class
Blob = BlobWrapper
# Patching ExecutableNetwork
ExecutableNetwork.infer = infer
# Patching InferRequest
InferRequest.infer = infer
InferRequest.async_infer = async_infer
InferRequest.get_result = get_result
# Patching InferQueue
InferQueue.async_infer = async_infer
23 changes: 22 additions & 1 deletion ngraph/python/src/openvino/inference_engine/ie_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,30 @@
'U32': np.uint32,
'U64': np.uint64}


def normalize_inputs(py_dict: dict):
'''Normalize a dictionary of inputs to contiguous numpy arrays.'''
return {k: (np.ascontiguousarray(v) if isinstance(v, np.ndarray) else v)
for k, v in py_dict.items()}


def infer(request, inputs: dict = {}):
results = request._infer(inputs=normalize_inputs(inputs))
return {name: (blob.buffer.copy()) for name, blob in results.items()}


def get_result(request, name: str):
return request.get_blob(name).buffer.copy()


def async_infer(request, inputs: dict = {}, userdata=None):
request._async_infer(inputs=normalize_inputs(inputs),
userdata=userdata)


# Dispatch Blob types on Python side.
class BlobWrapper:
def __new__(cls, tensor_desc : TensorDesc, arr : np.ndarray = None):
def __new__(cls, tensor_desc: TensorDesc, arr: np.ndarray = None):
arr_size = 0
precision = ""
if tensor_desc is not None:
Expand Down
Loading

0 comments on commit 847b98b

Please sign in to comment.