Skip to content

Commit

Permalink
Pytorch VFL NN refactor, refine and example test (intel-analytics#5607)
Browse files Browse the repository at this point in the history
  • Loading branch information
Litchilitchy authored and ForJadeForest committed Sep 20, 2022
1 parent 6fe518a commit bfc4799
Show file tree
Hide file tree
Showing 19 changed files with 201 additions and 235 deletions.
33 changes: 33 additions & 0 deletions python/ppml/dev/example/run-pytorch-nn-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env bash

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -ex
SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" &> /dev/null && pwd)
source $SCRIPT_DIR/../prepare_env.sh


cd "`dirname $0`"
echo "Running PPML tests"
cd ../../

rm -rf /tmp/pytorch_server_model*
rm -rf /tmp/pytorch_client_model*
rm -rf /tmp/vfl_server_model*
python src/bigdl/ppml/fl/nn/fl_server.py --client_num 2 &
python example/pytorch_nn_lr/pytorch_nn_lr_1.py --data_path example/pytorch_nn_lr/data/diabetes-vfl-1.csv &
python example/pytorch_nn_lr/pytorch_nn_lr_2.py --data_path example/pytorch_nn_lr/data/diabetes-vfl-2.csv &
35 changes: 19 additions & 16 deletions python/ppml/example/pytorch_nn_lr/pytorch-nn-lr-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@ We use [Diabetes](https://www.kaggle.com/competitions/house-prices-advanced-regr

The code is available in projects, including [Client 1 code](fgboost_regression_party_1.py) and [Client 2 code](fgboost_regression_party_2.py). You could directly start two different terminals are run them respectively to start a federated learning, and the order of start does not matter. Following is the detailed step-by-step tutorial to introduce how the code works.

### 2.1 Private Set Intersection
We first need to get the intersection of datasets across parties by Private Set Intersection algorithm.
### 2.1 Initialize FL Context
We first need to initialize the FL Context by
```python
from bigdl.ppml.fl.nn.fl_context import init_fl_context
init_fl_context(client_id, target)
```
The target is the URL of FL Server and is `localhost:8980` by default.
### 2.2 Private Set Intersection
Then get the intersection of datasets across parties by Private Set Intersection algorithm.
```python
df_train['ID'] = df_train['ID'].astype(str)
psi = PSI()
intersection = psi.get_intersection(list(df_train['ID']))
df_train = df_train[df_train['ID'].isin(intersection)]
```

### 2.2 Data Preprocessing
### 2.3 Data Preprocessing
Since one party owns label data while another not, different operations should be done before training.

For example, in party 1:
Expand All @@ -42,7 +49,7 @@ y = np.expand_dims(df_y.to_numpy(dtype="float32"), axis=1)
```


### 2.3 Create Model
### 2.4 Create Model
We create the following model for both clients, but with different number of inputs
```python
class LocalModel(nn.Module):
Expand Down Expand Up @@ -72,16 +79,14 @@ class ServerModel(nn.Module):

server_model = ServerModel()
```
### 2.4 Create Estimator
### 2.5 Create Estimator
Then, create Estimator and pass the arguments

```python
ppl = Estimator.from_torch(client_model=model,
client_id=client_id,
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-3},
target='localhost:8980',
optimizer_args={'lr':1e-4},
server_model=server_model,
server_model_path=/path/to/model/on/server,
client_model_path=/path/to/model/on/client)
Expand All @@ -90,22 +95,22 @@ Note that
* If you want to upload server model from this estimator, provide `server_model` argument.
* If you want server to automatically trigger model autosave, provide `server_model_path` with the path for server to save the model.
* If you want client to automatically trigger model autosave, provide `client_model_path` with the path for client to save the model.
We will also show how to use the saved model to resume training or predict in [2.7](#27-saveload)
We will also show how to use the saved model to resume training or predict in [2.8](#28-saveload)

### 2.5 Training
### 2.6 Training
Then call `fit` method to train

```python
response = ppl.fit(x, y, epoch=5)
```

### 2.6 Predict
### 2.7 Predict
```python
result = ppl.predict(x)
```

### 2.7 Save/Load
In [2.4](#24-create-estimator) we provided the model paths while creating estimator. Thus, client model and server model would both be automatically saved.
### 2.8 Save/Load
In [2.5](#25-create-estimator) we provided the model paths while creating estimator. Thus, client model and server model would both be automatically saved.

You can also call save explicitly by
```python
Expand All @@ -120,11 +125,9 @@ client_model = torch.load(model_path) # load client model first
# the server_model_path should be consistant with the one in 2.4
# because server would load model from this path if model exists
ppl = Estimator.from_torch(client_model=model,
client_id=client_id,
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-3},
target='localhost:8980',
optimizer_args={'lr':1e-4},
server_model_path=/path/to/model/on/server,
client_model_path=/path/to/model/on/client)
ppl.load_server_model(server_model_path) # trigger model loading on server
Expand Down
20 changes: 11 additions & 9 deletions python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

import logging
from typing import List
import numpy as np
import pandas as pd
Expand All @@ -22,8 +23,11 @@
import torch
from torch import Tensor, nn
from bigdl.ppml.fl.estimator import Estimator
from bigdl.ppml.fl.psi.psi import PSI
from bigdl.ppml.fl.nn.fl_context import init_fl_context
from bigdl.ppml.fl.psi.psi_client import PSI

fmt = '%(asctime)s %(levelname)s {%(module)s:%(lineno)d} - %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)

class LocalModel(nn.Module):
def __init__(self, num_feature) -> None:
Expand All @@ -50,8 +54,10 @@ def forward(self, x: List[Tensor]):

@click.command()
@click.option('--load_model', default=False)
def run_client(load_model):
df_train = pd.read_csv('./data/diabetes-vfl-1.csv')
@click.option('--data_path', default="./data/diabetes-vfl-1.csv")
def run_client(load_model, data_path):
init_fl_context('1')
df_train = pd.read_csv(data_path)

df_train['ID'] = df_train['ID'].astype(str)
psi = PSI()
Expand All @@ -69,11 +75,9 @@ def run_client(load_model):
if load_model:
model = torch.load('/tmp/pytorch_client_model_1.pt')
ppl = Estimator.from_torch(client_model=model,
client_id='1',
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-5},
target='localhost:8980',
optimizer_args={'lr':1e-4},
server_model_path='/tmp/pytorch_server_model',
client_model_path='/tmp/pytorch_client_model_1.pt')
ppl.load_server_model('/tmp/pytorch_server_model')
Expand All @@ -83,11 +87,9 @@ def run_client(load_model):

server_model = ServerModel()
ppl = Estimator.from_torch(client_model=model,
client_id='1',
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-5},
target='localhost:8980',
optimizer_args={'lr':1e-4},
server_model=server_model,
server_model_path='/tmp/pytorch_server_model',
client_model_path='/tmp/pytorch_client_model_1.pt')
Expand Down
21 changes: 11 additions & 10 deletions python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
# limitations under the License.
#

import logging
import numpy as np
import pandas as pd
import click

import torch
from torch import nn
from bigdl.ppml.fl.estimator import Estimator
from bigdl.ppml.fl.psi.psi import PSI
from bigdl.ppml.fl.nn.fl_context import init_fl_context
from bigdl.ppml.fl.psi.psi_client import PSI

fmt = '%(asctime)s %(levelname)s {%(module)s:%(lineno)d} - %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)

class LocalModel(nn.Module):
def __init__(self, num_feature) -> None:
Expand All @@ -36,9 +40,10 @@ def forward(self, x):

@click.command()
@click.option('--load_model', default=False)
def run_client(load_model):
df_train = pd.read_csv('./data/diabetes-vfl-2.csv')

@click.option('--data_path', default="./data/diabetes-vfl-2.csv")
def run_client(load_model, data_path):
init_fl_context('2')
df_train = pd.read_csv(data_path)
df_train['ID'] = df_train['ID'].astype(str)
psi = PSI()
intersection = psi.get_intersection(list(df_train['ID']))
Expand All @@ -53,21 +58,17 @@ def run_client(load_model):
if load_model:
model = torch.load('/tmp/pytorch_client_model_2.pt')
ppl = Estimator.from_torch(client_model=model,
client_id='2',
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-5},
target='localhost:8980',
optimizer_args={'lr':1e-4},
client_model_path='/tmp/pytorch_client_model_2.pt')
response = ppl.fit(x, y, 5)
else:
model = LocalModel(len(df_x.columns))
ppl = Estimator.from_torch(client_model=model,
client_id='2',
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-5},
target='localhost:8980',
optimizer_args={'lr':1e-4},
client_model_path='/tmp/pytorch_client_model_2.pt')
response = ppl.fit(x, y, 5)
result = ppl.predict(x)
Expand Down
10 changes: 1 addition & 9 deletions python/ppml/src/bigdl/ppml/fl/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,30 @@
class Estimator:
@staticmethod
def from_torch(client_model: nn.Module,
client_id,
loss_fn,
optimizer_cls,
optimizer_args={},
target="localhost:8980",
optimizer_args={},
server_model=None,
client_model_path=None,
server_model_path=None):
estimator = PytorchEstimator(model=client_model,
loss_fn=loss_fn,
optimizer_cls=optimizer_cls,
optimizer_args=optimizer_args,
client_id=client_id,
target=target,
server_model=server_model,
client_model_path=client_model_path,
server_model_path=server_model_path)
return estimator

@staticmethod
def from_keras(client_model: Model,
client_id,
loss_fn,
optimizer_cls,
optimizer_args={},
target="localhost:8980",
server_model=None):
estimator = TensorflowEstimator(model=client_model,
loss_fn=loss_fn,
optimizer_cls=optimizer_cls,
optimizer_args=optimizer_args,
client_id=client_id,
target=target,
server_model=server_model)
return estimator
59 changes: 5 additions & 54 deletions python/ppml/src/bigdl/ppml/fl/nn/fl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@


import logging
import pickle
import grpc
from bigdl.ppml.fl.nn.generated.nn_service_pb2 import TrainRequest, PredictRequest, UploadMetaRequest
from bigdl.ppml.fl.nn.generated.nn_service_pb2_grpc import *
from bigdl.ppml.fl.nn.utils import ndarray_map_to_tensor_map
import yaml
import threading
from bigdl.dllib.utils.log4Error import invalidInputError

from bigdl.ppml.fl.nn.utils import ClassAndArgsWrapper

class FLClient(object):
channel = None
Expand All @@ -52,64 +49,18 @@ def ensure_initialized():
else:
FLClient.channel = grpc.insecure_channel(FLClient.target)


def __init__(self, client_id, aggregator, target="localhost:8980") -> None:
self.secure = False
self.load_config()
with FLClient._lock:
if FLClient.channel == None:
if self.secure:
FLClient.channel = grpc.secure_channel(target, self.creds)
else:
FLClient.channel = grpc.insecure_channel(target)
self.nn_stub = NNServiceStub(FLClient.channel)
self.client_uuid = client_id
self.aggregator = aggregator

def train(self, x):
tensor_map = ndarray_map_to_tensor_map(x)
train_request = TrainRequest(clientuuid=self.client_uuid,
data=tensor_map,
algorithm=self.aggregator)

response = self.nn_stub.train(train_request)
if response.code == 1:
invalidInputError(False,
response.response)
return response

def predict(self, x):
tensor_map = ndarray_map_to_tensor_map(x)
predict_request = PredictRequest(clientuuid=self.client_uuid,
data=tensor_map,
algorithm=self.aggregator)

response = self.nn_stub.predict(predict_request)
if response.code == 1:
invalidInputError(False,
response.response)
return response

def upload_meta(self, loss_fn, optimizer_cls, optimizer_args):
# upload model to server
loss_fn = pickle.dumps(loss_fn)
optimizer = ClassAndArgsWrapper(optimizer_cls, optimizer_args).to_protobuf()
request = UploadMetaRequest(client_uuid=self.client_uuid,
loss_fn=loss_fn,
optimizer=optimizer,
aggregator=self.aggregator)
return self.nn_stub.upload_meta(request)

def load_config(self):
@staticmethod
def load_config():
try:
with open('ppml-conf.yaml', 'r') as stream:
conf = yaml.safe_load(stream)
if 'privateKeyFilePath' in conf:
self.secure = True
FLClient.secure = True
with open(conf['privateKeyFilePath'], 'rb') as f:
self.creds = grpc.ssl_channel_credentials(f.read())
FLClient.creds = grpc.ssl_channel_credentials(f.read())
except yaml.YAMLError as e:
logging.warn('Loading config failed, using default config ')
except Exception as e:
logging.warn('Failed to find config file "ppml-conf.yaml", using default config')


2 changes: 2 additions & 0 deletions python/ppml/src/bigdl/ppml/fl/nn/fl_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from ..nn.fl_client import FLClient

def init_fl_context(client_id, target="localhost:8980"):
FLClient.load_config()
FLClient.set_client_id(client_id)
# target can be set in config file, and also could be overwritten here
FLClient.set_target(target)
FLClient.ensure_initialized()
Loading

0 comments on commit bfc4799

Please sign in to comment.