Skip to content

Commit

Permalink
initial commit for e2e pipeline demo (#1)
Browse files Browse the repository at this point in the history
Co-authored-by: alex.bao <[email protected]>
  • Loading branch information
alexbao and alex.bao authored Jan 14, 2022
1 parent 5fd585c commit f5a2abe
Show file tree
Hide file tree
Showing 17 changed files with 1,063 additions and 1 deletion.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# mobius
# Mobius online learning.

## Code Editor
Recommend Atom (https://atom.io/) as the code editor.
Recommended plugins: atom-beautify, python-indent, auto-indent, vim-mode-plus (for VIM-ers)
29 changes: 29 additions & 0 deletions demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
## Prerequisites
- Additional packages needed to run the demo code.
- Need to disable the main thread check manually. It will be located at line
1038 of file python2.7/site-packages/ray/worker.py (or similar path depending
on the OS and python version). There are ongoing efforts to support multiple
threading, hopefully it will be supported natively soon.

Generate proto file
> $ bash generate_proto.sh
Start local zookeeper service
> $ zkServer start
Start kafka service
> $ kafka-server-start /usr/local/etc/kafka/server.properties

Start RPC server
> $ python server_rpc.py --kafka_server=localhost:9092 --kafka_topic=kafkaesque
Start gateway server (easiest integration as go gateway generates most of the code)
Required if you'd like to send the request with PostMate.
> $ go run server_gateway.go
Stream training data
> $ python publish_msg.py --kafka_server=localhost:9092 --kafka_topic=kafkaesque --num_to_send=10000 --sleep_every_N=100 --target_a=1 --target_b=3
Client script to send inference request and compute the mean squared error.
> $ python client.py --num_batch=100 --target_a=1 --target_b=3
47 changes: 47 additions & 0 deletions demo/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import argparse
import grpc
import numpy
import time
import util

from proto import mobius_pb2, mobius_pb2_grpc

parser = argparse.ArgumentParser(description="Online learning service")

parser.add_argument("--rpc_endpoint", default="localhost:50051", type=str,
help="RPC endpoint")
parser.add_argument("--target_a", default=0.1, type=float,
help="Target 'a' to simulate for y = ax + b")
parser.add_argument("--target_b", default=0.3, type=float,
help="Target 'b' to simulate for y = ax + b")
parser.add_argument("--batch_size", default=100, type=int,
help="Batch size for inference")
parser.add_argument("--num_batch", default=100, type=int,
help="Number of batches to send")
parser.add_argument("--batch_interval_seconds", default=1, type=int,
help="Interval seconds between batches")


if __name__ == "__main__":
args = parser.parse_args()
# open a gRPC channel
channel = grpc.insecure_channel(args.rpc_endpoint)
# create a stub (client)
stub = mobius_pb2_grpc.MobiusStub(channel)

for i in range(args.num_batch):
x, y = util.generate_linear_x_y_data(
args.batch_size, args.target_a, args.target_b,
util.now_millis() / 1000)
# create a valid request message
request = mobius_pb2.InferRequest(x=x)
# make the call
response = stub.Infer(request)

A = numpy.array(y)
B = numpy.array(response.y)
mean_error = ((A - B) ** 2).mean()
# print response
print 'batch', i, 'mean squared error', mean_error

time.sleep(args.batch_interval_seconds)
7 changes: 7 additions & 0 deletions demo/generate_proto.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Generate pb file for Python RPC
python -m grpc_tools.protoc -Iproto -I/usr/local/include -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --python_out=proto --grpc_python_out=proto mobius.proto

# Generate pb file for Go gateway
protoc -I/usr/local/include -Iproto -I$GOPATH/src -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --grpc-gateway_out=logtostderr=true:go_gateway mobius.proto
# Generate pb file for Go RPC
protoc -I/usr/local/include -Iproto -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --go_out=plugins=grpc:go_gateway mobius.proto
262 changes: 262 additions & 0 deletions demo/go_gateway/mobius.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f5a2abe

Please sign in to comment.