Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pytorch VFL NN refactor, refine and example test #5607

Merged
merged 3 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions python/ppml/dev/example/run-pytorch-nn-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env bash

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -ex
SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" &> /dev/null && pwd)
source $SCRIPT_DIR/../prepare_env.sh


cd "`dirname $0`"
echo "Running PPML tests"
cd ../../

rm -rf /tmp/pytorch_server_model*
rm -rf /tmp/pytorch_client_model*
rm -rf /tmp/vfl_server_model*
python src/bigdl/ppml/fl/nn/fl_server.py --client_num 2 &
python example/pytorch_nn_lr/pytorch_nn_lr_1.py --data_path example/pytorch_nn_lr/data/diabetes-vfl-1.csv &
python example/pytorch_nn_lr/pytorch_nn_lr_2.py --data_path example/pytorch_nn_lr/data/diabetes-vfl-2.csv &
35 changes: 19 additions & 16 deletions python/ppml/example/pytorch_nn_lr/pytorch-nn-lr-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@ We use [Diabetes](https://www.kaggle.com/competitions/house-prices-advanced-regr

The code is available in projects, including [Client 1 code](fgboost_regression_party_1.py) and [Client 2 code](fgboost_regression_party_2.py). You could directly start two different terminals are run them respectively to start a federated learning, and the order of start does not matter. Following is the detailed step-by-step tutorial to introduce how the code works.

### 2.1 Private Set Intersection
We first need to get the intersection of datasets across parties by Private Set Intersection algorithm.
### 2.1 Initialize FL Context
We first need to initialize the FL Context by
```python
from bigdl.ppml.fl.nn.fl_context import init_fl_context
init_fl_context(client_id, target)
```
The target is the URL of FL Server and is `localhost:8980` by default.
### 2.2 Private Set Intersection
Then get the intersection of datasets across parties by Private Set Intersection algorithm.
```python
df_train['ID'] = df_train['ID'].astype(str)
psi = PSI()
intersection = psi.get_intersection(list(df_train['ID']))
df_train = df_train[df_train['ID'].isin(intersection)]
```

### 2.2 Data Preprocessing
### 2.3 Data Preprocessing
Since one party owns label data while another not, different operations should be done before training.

For example, in party 1:
Expand All @@ -42,7 +49,7 @@ y = np.expand_dims(df_y.to_numpy(dtype="float32"), axis=1)
```


### 2.3 Create Model
### 2.4 Create Model
We create the following model for both clients, but with different number of inputs
```python
class LocalModel(nn.Module):
Expand Down Expand Up @@ -72,16 +79,14 @@ class ServerModel(nn.Module):

server_model = ServerModel()
```
### 2.4 Create Estimator
### 2.5 Create Estimator
Then, create Estimator and pass the arguments

```python
ppl = Estimator.from_torch(client_model=model,
client_id=client_id,
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-3},
target='localhost:8980',
optimizer_args={'lr':1e-4},
server_model=server_model,
server_model_path=/path/to/model/on/server,
client_model_path=/path/to/model/on/client)
Expand All @@ -90,22 +95,22 @@ Note that
* If you want to upload server model from this estimator, provide `server_model` argument.
* If you want server to automatically trigger model autosave, provide `server_model_path` with the path for server to save the model.
* If you want client to automatically trigger model autosave, provide `client_model_path` with the path for client to save the model.
We will also show how to use the saved model to resume training or predict in [2.7](#27-saveload)
We will also show how to use the saved model to resume training or predict in [2.8](#28-saveload)

### 2.5 Training
### 2.6 Training
Then call `fit` method to train

```python
response = ppl.fit(x, y, epoch=5)
```

### 2.6 Predict
### 2.7 Predict
```python
result = ppl.predict(x)
```

### 2.7 Save/Load
In [2.4](#24-create-estimator) we provided the model paths while creating estimator. Thus, client model and server model would both be automatically saved.
### 2.8 Save/Load
In [2.5](#25-create-estimator) we provided the model paths while creating estimator. Thus, client model and server model would both be automatically saved.

You can also call save explicitly by
```python
Expand All @@ -120,11 +125,9 @@ client_model = torch.load(model_path) # load client model first
# the server_model_path should be consistant with the one in 2.4
# because server would load model from this path if model exists
ppl = Estimator.from_torch(client_model=model,
client_id=client_id,
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-3},
target='localhost:8980',
optimizer_args={'lr':1e-4},
server_model_path=/path/to/model/on/server,
client_model_path=/path/to/model/on/client)
ppl.load_server_model(server_model_path) # trigger model loading on server
Expand Down
20 changes: 11 additions & 9 deletions python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

import logging
from typing import List
import numpy as np
import pandas as pd
Expand All @@ -22,8 +23,11 @@
import torch
from torch import Tensor, nn
from bigdl.ppml.fl.estimator import Estimator
from bigdl.ppml.fl.psi.psi import PSI
from bigdl.ppml.fl.nn.fl_context import init_fl_context
from bigdl.ppml.fl.psi.psi_client import PSI

fmt = '%(asctime)s %(levelname)s {%(module)s:%(lineno)d} - %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)

class LocalModel(nn.Module):
def __init__(self, num_feature) -> None:
Expand All @@ -50,8 +54,10 @@ def forward(self, x: List[Tensor]):

@click.command()
@click.option('--load_model', default=False)
def run_client(load_model):
df_train = pd.read_csv('./data/diabetes-vfl-1.csv')
@click.option('--data_path', default="./data/diabetes-vfl-1.csv")
def run_client(load_model, data_path):
init_fl_context('1')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should id will be an integer instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this. And user must pass an integer when calling init_fl_context and the client ID could only be set in this method.

All the protobuf types of ID are String now, I would open another PR to modify all of them.

df_train = pd.read_csv(data_path)

df_train['ID'] = df_train['ID'].astype(str)
psi = PSI()
Expand All @@ -69,11 +75,9 @@ def run_client(load_model):
if load_model:
model = torch.load('/tmp/pytorch_client_model_1.pt')
ppl = Estimator.from_torch(client_model=model,
client_id='1',
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-5},
target='localhost:8980',
optimizer_args={'lr':1e-4},
server_model_path='/tmp/pytorch_server_model',
client_model_path='/tmp/pytorch_client_model_1.pt')
ppl.load_server_model('/tmp/pytorch_server_model')
Expand All @@ -83,11 +87,9 @@ def run_client(load_model):

server_model = ServerModel()
ppl = Estimator.from_torch(client_model=model,
client_id='1',
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-5},
target='localhost:8980',
optimizer_args={'lr':1e-4},
server_model=server_model,
server_model_path='/tmp/pytorch_server_model',
client_model_path='/tmp/pytorch_client_model_1.pt')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

save after fit?

Copy link
Contributor Author

@Litchilitchy Litchilitchy Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Estimator would automatically save server model and client model to respective path after each epoch of fit like Orca Estimator.

Expand Down
21 changes: 11 additions & 10 deletions python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
# limitations under the License.
#

import logging
import numpy as np
import pandas as pd
import click

import torch
from torch import nn
from bigdl.ppml.fl.estimator import Estimator
from bigdl.ppml.fl.psi.psi import PSI
from bigdl.ppml.fl.nn.fl_context import init_fl_context
from bigdl.ppml.fl.psi.psi_client import PSI

fmt = '%(asctime)s %(levelname)s {%(module)s:%(lineno)d} - %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)

class LocalModel(nn.Module):
def __init__(self, num_feature) -> None:
Expand All @@ -36,9 +40,10 @@ def forward(self, x):

@click.command()
@click.option('--load_model', default=False)
def run_client(load_model):
df_train = pd.read_csv('./data/diabetes-vfl-2.csv')

@click.option('--data_path', default="./data/diabetes-vfl-2.csv")
def run_client(load_model, data_path):
init_fl_context('2')
df_train = pd.read_csv(data_path)
df_train['ID'] = df_train['ID'].astype(str)
psi = PSI()
intersection = psi.get_intersection(list(df_train['ID']))
Expand All @@ -53,21 +58,17 @@ def run_client(load_model):
if load_model:
model = torch.load('/tmp/pytorch_client_model_2.pt')
ppl = Estimator.from_torch(client_model=model,
client_id='2',
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-5},
target='localhost:8980',
optimizer_args={'lr':1e-4},
client_model_path='/tmp/pytorch_client_model_2.pt')
response = ppl.fit(x, y, 5)
else:
model = LocalModel(len(df_x.columns))
ppl = Estimator.from_torch(client_model=model,
client_id='2',
loss_fn=loss_fn,
optimizer_cls=torch.optim.SGD,
optimizer_args={'lr':1e-5},
target='localhost:8980',
optimizer_args={'lr':1e-4},
client_model_path='/tmp/pytorch_client_model_2.pt')
response = ppl.fit(x, y, 5)
result = ppl.predict(x)
Expand Down
10 changes: 1 addition & 9 deletions python/ppml/src/bigdl/ppml/fl/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,30 @@
class Estimator:
@staticmethod
def from_torch(client_model: nn.Module,
client_id,
loss_fn,
optimizer_cls,
optimizer_args={},
target="localhost:8980",
optimizer_args={},
server_model=None,
client_model_path=None,
server_model_path=None):
estimator = PytorchEstimator(model=client_model,
loss_fn=loss_fn,
optimizer_cls=optimizer_cls,
optimizer_args=optimizer_args,
client_id=client_id,
target=target,
server_model=server_model,
client_model_path=client_model_path,
server_model_path=server_model_path)
return estimator

@staticmethod
def from_keras(client_model: Model,
client_id,
loss_fn,
optimizer_cls,
optimizer_args={},
target="localhost:8980",
server_model=None):
estimator = TensorflowEstimator(model=client_model,
loss_fn=loss_fn,
optimizer_cls=optimizer_cls,
optimizer_args=optimizer_args,
client_id=client_id,
target=target,
server_model=server_model)
return estimator
59 changes: 5 additions & 54 deletions python/ppml/src/bigdl/ppml/fl/nn/fl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@


import logging
import pickle
import grpc
from bigdl.ppml.fl.nn.generated.nn_service_pb2 import TrainRequest, PredictRequest, UploadMetaRequest
from bigdl.ppml.fl.nn.generated.nn_service_pb2_grpc import *
from bigdl.ppml.fl.nn.utils import ndarray_map_to_tensor_map
import yaml
import threading
from bigdl.dllib.utils.log4Error import invalidInputError

from bigdl.ppml.fl.nn.utils import ClassAndArgsWrapper

class FLClient(object):
channel = None
Expand All @@ -52,64 +49,18 @@ def ensure_initialized():
else:
FLClient.channel = grpc.insecure_channel(FLClient.target)


def __init__(self, client_id, aggregator, target="localhost:8980") -> None:
self.secure = False
self.load_config()
with FLClient._lock:
if FLClient.channel == None:
if self.secure:
FLClient.channel = grpc.secure_channel(target, self.creds)
else:
FLClient.channel = grpc.insecure_channel(target)
self.nn_stub = NNServiceStub(FLClient.channel)
self.client_uuid = client_id
self.aggregator = aggregator

def train(self, x):
tensor_map = ndarray_map_to_tensor_map(x)
train_request = TrainRequest(clientuuid=self.client_uuid,
data=tensor_map,
algorithm=self.aggregator)

response = self.nn_stub.train(train_request)
if response.code == 1:
invalidInputError(False,
response.response)
return response

def predict(self, x):
tensor_map = ndarray_map_to_tensor_map(x)
predict_request = PredictRequest(clientuuid=self.client_uuid,
data=tensor_map,
algorithm=self.aggregator)

response = self.nn_stub.predict(predict_request)
if response.code == 1:
invalidInputError(False,
response.response)
return response

def upload_meta(self, loss_fn, optimizer_cls, optimizer_args):
# upload model to server
loss_fn = pickle.dumps(loss_fn)
optimizer = ClassAndArgsWrapper(optimizer_cls, optimizer_args).to_protobuf()
request = UploadMetaRequest(client_uuid=self.client_uuid,
loss_fn=loss_fn,
optimizer=optimizer,
aggregator=self.aggregator)
return self.nn_stub.upload_meta(request)

def load_config(self):
@staticmethod
def load_config():
try:
with open('ppml-conf.yaml', 'r') as stream:
conf = yaml.safe_load(stream)
if 'privateKeyFilePath' in conf:
self.secure = True
FLClient.secure = True
with open(conf['privateKeyFilePath'], 'rb') as f:
self.creds = grpc.ssl_channel_credentials(f.read())
FLClient.creds = grpc.ssl_channel_credentials(f.read())
except yaml.YAMLError as e:
logging.warn('Loading config failed, using default config ')
except Exception as e:
logging.warn('Failed to find config file "ppml-conf.yaml", using default config')


2 changes: 2 additions & 0 deletions python/ppml/src/bigdl/ppml/fl/nn/fl_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from ..nn.fl_client import FLClient

def init_fl_context(client_id, target="localhost:8980"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add python doc

FLClient.load_config()
FLClient.set_client_id(client_id)
# target can be set in config file, and also could be overwritten here
FLClient.set_target(target)
FLClient.ensure_initialized()
Loading