diff --git a/pyzoo/zoo/serving/client.py b/pyzoo/zoo/serving/client.py index d9eb51a297d..599ec0cfa6f 100644 --- a/pyzoo/zoo/serving/client.py +++ b/pyzoo/zoo/serving/client.py @@ -101,8 +101,16 @@ def enqueue(self, uri, **data): field_list = [] data_list = [] for key, value in data.items(): + if "string" in key: + # list of string will be converted to Tensor of String + # use | to split + str_concat = '|'.join(value) + field = pa.field(key, pa.string()) + data = pa.array([str_concat]) + field_list.append(field) + data_list.append(data) - if isinstance(value, str): + elif isinstance(value, str): # str value will be considered as image path field = pa.field(key, pa.string()) data = self.encode_image(value) diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/inference/InferenceModel.scala b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/inference/InferenceModel.scala index d29dc7515e1..b3b4eb273c6 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/inference/InferenceModel.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/inference/InferenceModel.scala @@ -24,9 +24,11 @@ import java.util.{List => JList} import com.intel.analytics.bigdl.nn.abstractnn.Activity import com.intel.analytics.bigdl.tensor.Tensor +import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric import com.sun.xml.internal.bind.v2.TODO import scala.collection.JavaConverters._ +import scala.reflect.ClassTag class InferenceModel(private var autoScalingEnabled: Boolean = true, private var concurrentNum: Int = 20, @@ -37,7 +39,6 @@ class InferenceModel(private var autoScalingEnabled: Boolean = true, require(concurrentNum > 0, "concurrentNum should > 0") - private var batchCnt: Int = 0 @transient var inferenceSummary: InferenceSummary = null /** * default constructor, will create a InferenceModel with auto-scaling enabled. @@ -743,17 +744,17 @@ class InferenceModel(private var autoScalingEnabled: Boolean = true, val model: AbstractModel = retrieveModel() try { val begin = System.nanoTime() - val batchSize = if (inputActivity.isTensor) { - inputActivity.toTensor[Float].size(1) - } else { - val sampleKey = inputActivity.toTable.keySet.head - inputActivity.toTable(sampleKey).asInstanceOf[Tensor[Float]].size(1) - } +// val batchSize = if (inputActivity.isTensor) { +// inputActivity.toTensor[T].size(1) +// } else { +// val sampleKey = inputActivity.toTable.keySet.head +// inputActivity.toTable(sampleKey).asInstanceOf[Tensor[T]].size(1) +// } val result = model.predict(inputActivity) val end = System.nanoTime() val latency = end - begin - val name = s"model predict for batch ${batchSize}" + val name = s"model predict for batch" InferenceSupportive.logger.info(s"$name time elapsed [${latency/1e9} s, ${latency/1e6} ms].") result diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/serving/engine/FlinkInference.scala b/zoo/src/main/scala/com/intel/analytics/zoo/serving/engine/FlinkInference.scala index 06838c94dee..2448281a876 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/serving/engine/FlinkInference.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/serving/engine/FlinkInference.scala @@ -36,8 +36,6 @@ class FlinkInference(params: SerParams) override def open(parameters: Configuration): Unit = { inferenceCnt = 0 - model = params.model -// println("in open method, ", model) logger = Logger.getLogger(getClass) pre = new PreProcessing(params) } diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/serving/preprocessing/PreProcessing.scala b/zoo/src/main/scala/com/intel/analytics/zoo/serving/preprocessing/PreProcessing.scala index 697a83575fe..65d652235a4 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/serving/preprocessing/PreProcessing.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/serving/preprocessing/PreProcessing.scala @@ -62,20 +62,31 @@ class PreProcessing(param: SerParams) { val instance = Instances.fromArrow(byteBuffer) val kvMap = instance.instances.flatMap(insMap => { - val oneInsMap = insMap.map(kv => { - if (kv._2.isInstanceOf[String]) { + val oneInsMap = insMap.map(kv => + if (kv._1.contains("string")) { + (kv._1, decodeString(kv._2.asInstanceOf[String])) + } + else if (kv._2.isInstanceOf[String]) { (kv._1, decodeImage(kv._2.asInstanceOf[String])) } else { (kv._1, decodeTensor(kv._2.asInstanceOf[( ArrayBuffer[Int], ArrayBuffer[Float], ArrayBuffer[Int], ArrayBuffer[Int])])) - } - }).toList + }).toList // Seq(T(oneInsMap.head, oneInsMap.tail: _*)) val arr = oneInsMap.map(x => x._2) Seq(T.array(arr.toArray)) }) kvMap.head } + def decodeString(s: String): Tensor[String] = { + + val eleList = s.split("\\|") + val tensor = Tensor[String](eleList.length) + (1 to eleList.length).foreach(i => { + tensor.setValue(i, eleList(i - 1)) + }) + tensor + } def decodeImage(s: String, idx: Int = 0): Tensor[Float] = { byteBuffer = java.util.Base64.getDecoder.decode(s) val mat = OpenCVMethod.fromImageBytes(byteBuffer, Imgcodecs.CV_LOAD_IMAGE_UNCHANGED) diff --git a/zoo/src/test/scala/com/intel/analytics/zoo/serving/InferenceSpec.scala b/zoo/src/test/scala/com/intel/analytics/zoo/serving/InferenceSpec.scala new file mode 100644 index 00000000000..6d254d55b25 --- /dev/null +++ b/zoo/src/test/scala/com/intel/analytics/zoo/serving/InferenceSpec.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2018 Analytics Zoo Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.analytics.zoo.serving + +import com.intel.analytics.bigdl.tensor.Tensor +import com.intel.analytics.zoo.serving.utils.{ClusterServingHelper, SerParams} +import org.scalatest.{FlatSpec, Matchers} + +class InferenceSpec extends FlatSpec with Matchers { + "TF String input" should "work" in { +// val configPath = "/home/litchy/pro/analytics-zoo/config.yaml" + val str = "abc|dff|aoa" + val eleList = str.split("\\|") +// val helper = new ClusterServingHelper(configPath) +// helper.initArgs() +// val param = new SerParams(helper) +// val model = helper.loadInferenceModel() +// val res = model.doPredict(t) +// res + } + +} diff --git a/zoo/src/test/scala/com/intel/analytics/zoo/serving/PreProcessingSpec.scala b/zoo/src/test/scala/com/intel/analytics/zoo/serving/PreProcessingSpec.scala index c11bcda4b29..956ae21f26a 100644 --- a/zoo/src/test/scala/com/intel/analytics/zoo/serving/PreProcessingSpec.scala +++ b/zoo/src/test/scala/com/intel/analytics/zoo/serving/PreProcessingSpec.scala @@ -58,4 +58,12 @@ class PreProcessingSpec extends FlatSpec with Matchers { val a = pre.decodeTensor(info) a } + "decode string tensor" should "work" in { + val pre = new PreProcessing(null) + val str = "abc|dff|aoa" + val tensor = pre.decodeString(str) + assert(tensor.valueAt(1) == "abc") + assert(tensor.valueAt(2) == "dff") + assert(tensor.valueAt(3) == "aoa") + } }