Skip to content

Latest commit

 

History

History
114 lines (81 loc) · 2.95 KB

Quickstart - Deploying a Classifier.md

File metadata and controls

114 lines (81 loc) · 2.95 KB
title hide_title status
Quickstart - Deploying a Classifier
true
stable

Model Deployment with Spark Serving

In this example, we try to predict incomes from the Adult Census dataset. Then we will use Spark serving to deploy it as a realtime web service. First, we import needed packages:

Now let's read the data and split it to train and test sets:

data = spark.read.parquet(
    "wasbs://[email protected]/AdultCensusIncome.parquet"
)
data = data.select(["education", "marital-status", "hours-per-week", "income"])
train, test = data.randomSplit([0.75, 0.25], seed=123)
train.limit(10).toPandas()

TrainClassifier can be used to initialize and fit a model, it wraps SparkML classifiers. You can use help(synapse.ml.TrainClassifier) to view the different parameters.

Note that it implicitly converts the data into the format expected by the algorithm. More specifically it: tokenizes, hashes strings, one-hot encodes categorical variables, assembles the features into a vector etc. The parameter numFeatures controls the number of hashed features.

from synapse.ml.train import TrainClassifier
from pyspark.ml.classification import LogisticRegression

model = TrainClassifier(
    model=LogisticRegression(), labelCol="income", numFeatures=256
).fit(train)

After the model is trained, we score it against the test dataset and view metrics.

from synapse.ml.train import ComputeModelStatistics, TrainedClassifierModel

prediction = model.transform(test)
prediction.printSchema()
metrics = ComputeModelStatistics().transform(prediction)
metrics.limit(10).toPandas()

First, we will define the webservice input/output. For more information, you can visit the documentation for Spark Serving

from pyspark.sql.types import *
from synapse.ml.io import *
import uuid

serving_inputs = (
    spark.readStream.server()
    .address("localhost", 8898, "my_api")
    .option("name", "my_api")
    .load()
    .parseRequest("my_api", test.schema)
)

serving_outputs = model.transform(serving_inputs).makeReply("prediction")

server = (
    serving_outputs.writeStream.server()
    .replyTo("my_api")
    .queryName("my_query")
    .option("checkpointLocation", "file:///tmp/checkpoints-{}".format(uuid.uuid1()))
    .start()
)

Test the webservice

import requests

data = '{"education":" 10th","marital-status":"Divorced","hours-per-week":40.0}'
r = requests.post(data=data, url="http://localhost:8898/my_api")
print("Response {}".format(r.text))
import requests

data = '{"education":" Masters","marital-status":"Married-civ-spouse","hours-per-week":40.0}'
r = requests.post(data=data, url="http://localhost:8898/my_api")
print("Response {}".format(r.text))
import time

time.sleep(20)  # wait for server to finish setting up (just to be safe)
server.stop()