From bfc479984bdb2c3c52ca9a7e19131d7014386e27 Mon Sep 17 00:00:00 2001 From: Song Jiaming Date: Thu, 1 Sep 2022 11:02:32 +0800 Subject: [PATCH] Pytorch VFL NN refactor, refine and example test (#5607) --- python/ppml/dev/example/run-pytorch-nn-test | 33 +++++++++ .../pytorch_nn_lr/pytorch-nn-lr-tutorial.md | 35 +++++---- .../example/pytorch_nn_lr/pytorch_nn_lr_1.py | 20 ++--- .../example/pytorch_nn_lr/pytorch_nn_lr_2.py | 21 +++--- python/ppml/src/bigdl/ppml/fl/estimator.py | 10 +-- python/ppml/src/bigdl/ppml/fl/nn/fl_client.py | 59 ++------------- .../ppml/src/bigdl/ppml/fl/nn/fl_context.py | 2 + python/ppml/src/bigdl/ppml/fl/nn/fl_server.py | 19 +++-- python/ppml/src/bigdl/ppml/fl/nn/nn_client.py | 74 +++++++++++++++++++ .../src/bigdl/ppml/fl/nn/pytorch/estimator.py | 7 +- .../bigdl/ppml/fl/nn/tensorflow/estimator.py | 5 +- .../ppml/fl/psi/{psi.py => psi_client.py} | 8 +- .../src/bigdl/ppml/fl/psi/psi_intersection.py | 2 + .../fl/nn/pytorch/test_logistic_regression.py | 15 ++-- .../bigdl/ppml/fl/nn/pytorch/test_mnist.py | 5 +- .../ppml/fl/nn/pytorch/test_protobuf_utils.py | 71 ------------------ .../ppml/fl/nn/pytorch/test_save_load.py | 38 ++-------- .../bigdl/ppml/fl/nn/tensorflow/test_mnist.py | 10 +-- .../ppml/test/bigdl/ppml/fl/psi/test_psi.py | 2 +- 19 files changed, 201 insertions(+), 235 deletions(-) create mode 100644 python/ppml/dev/example/run-pytorch-nn-test create mode 100644 python/ppml/src/bigdl/ppml/fl/nn/nn_client.py rename python/ppml/src/bigdl/ppml/fl/psi/{psi.py => psi_client.py} (89%) delete mode 100644 python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_protobuf_utils.py diff --git a/python/ppml/dev/example/run-pytorch-nn-test b/python/ppml/dev/example/run-pytorch-nn-test new file mode 100644 index 00000000000..a90e1a3d340 --- /dev/null +++ b/python/ppml/dev/example/run-pytorch-nn-test @@ -0,0 +1,33 @@ +#!/usr/bin/env bash + +# +# Copyright 2016 The BigDL Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -ex +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" &> /dev/null && pwd) +source $SCRIPT_DIR/../prepare_env.sh + + +cd "`dirname $0`" +echo "Running PPML tests" +cd ../../ + +rm -rf /tmp/pytorch_server_model* +rm -rf /tmp/pytorch_client_model* +rm -rf /tmp/vfl_server_model* +python src/bigdl/ppml/fl/nn/fl_server.py --client_num 2 & +python example/pytorch_nn_lr/pytorch_nn_lr_1.py --data_path example/pytorch_nn_lr/data/diabetes-vfl-1.csv & +python example/pytorch_nn_lr/pytorch_nn_lr_2.py --data_path example/pytorch_nn_lr/data/diabetes-vfl-2.csv & diff --git a/python/ppml/example/pytorch_nn_lr/pytorch-nn-lr-tutorial.md b/python/ppml/example/pytorch_nn_lr/pytorch-nn-lr-tutorial.md index b51a4c16745..33da6a1f448 100644 --- a/python/ppml/example/pytorch_nn_lr/pytorch-nn-lr-tutorial.md +++ b/python/ppml/example/pytorch_nn_lr/pytorch-nn-lr-tutorial.md @@ -20,8 +20,15 @@ We use [Diabetes](https://www.kaggle.com/competitions/house-prices-advanced-regr The code is available in projects, including [Client 1 code](fgboost_regression_party_1.py) and [Client 2 code](fgboost_regression_party_2.py). You could directly start two different terminals are run them respectively to start a federated learning, and the order of start does not matter. Following is the detailed step-by-step tutorial to introduce how the code works. -### 2.1 Private Set Intersection -We first need to get the intersection of datasets across parties by Private Set Intersection algorithm. +### 2.1 Initialize FL Context +We first need to initialize the FL Context by +```python +from bigdl.ppml.fl.nn.fl_context import init_fl_context +init_fl_context(client_id, target) +``` +The target is the URL of FL Server and is `localhost:8980` by default. +### 2.2 Private Set Intersection +Then get the intersection of datasets across parties by Private Set Intersection algorithm. ```python df_train['ID'] = df_train['ID'].astype(str) psi = PSI() @@ -29,7 +36,7 @@ intersection = psi.get_intersection(list(df_train['ID'])) df_train = df_train[df_train['ID'].isin(intersection)] ``` -### 2.2 Data Preprocessing +### 2.3 Data Preprocessing Since one party owns label data while another not, different operations should be done before training. For example, in party 1: @@ -42,7 +49,7 @@ y = np.expand_dims(df_y.to_numpy(dtype="float32"), axis=1) ``` -### 2.3 Create Model +### 2.4 Create Model We create the following model for both clients, but with different number of inputs ```python class LocalModel(nn.Module): @@ -72,16 +79,14 @@ class ServerModel(nn.Module): server_model = ServerModel() ``` -### 2.4 Create Estimator +### 2.5 Create Estimator Then, create Estimator and pass the arguments ```python ppl = Estimator.from_torch(client_model=model, - client_id=client_id, loss_fn=loss_fn, optimizer_cls=torch.optim.SGD, - optimizer_args={'lr':1e-3}, - target='localhost:8980', + optimizer_args={'lr':1e-4}, server_model=server_model, server_model_path=/path/to/model/on/server, client_model_path=/path/to/model/on/client) @@ -90,22 +95,22 @@ Note that * If you want to upload server model from this estimator, provide `server_model` argument. * If you want server to automatically trigger model autosave, provide `server_model_path` with the path for server to save the model. * If you want client to automatically trigger model autosave, provide `client_model_path` with the path for client to save the model. -We will also show how to use the saved model to resume training or predict in [2.7](#27-saveload) +We will also show how to use the saved model to resume training or predict in [2.8](#28-saveload) -### 2.5 Training +### 2.6 Training Then call `fit` method to train ```python response = ppl.fit(x, y, epoch=5) ``` -### 2.6 Predict +### 2.7 Predict ```python result = ppl.predict(x) ``` -### 2.7 Save/Load -In [2.4](#24-create-estimator) we provided the model paths while creating estimator. Thus, client model and server model would both be automatically saved. +### 2.8 Save/Load +In [2.5](#25-create-estimator) we provided the model paths while creating estimator. Thus, client model and server model would both be automatically saved. You can also call save explicitly by ```python @@ -120,11 +125,9 @@ client_model = torch.load(model_path) # load client model first # the server_model_path should be consistant with the one in 2.4 # because server would load model from this path if model exists ppl = Estimator.from_torch(client_model=model, - client_id=client_id, loss_fn=loss_fn, optimizer_cls=torch.optim.SGD, - optimizer_args={'lr':1e-3}, - target='localhost:8980', + optimizer_args={'lr':1e-4}, server_model_path=/path/to/model/on/server, client_model_path=/path/to/model/on/client) ppl.load_server_model(server_model_path) # trigger model loading on server diff --git a/python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_1.py b/python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_1.py index d65d235cc57..156279f17f9 100644 --- a/python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_1.py +++ b/python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_1.py @@ -14,6 +14,7 @@ # limitations under the License. # +import logging from typing import List import numpy as np import pandas as pd @@ -22,8 +23,11 @@ import torch from torch import Tensor, nn from bigdl.ppml.fl.estimator import Estimator -from bigdl.ppml.fl.psi.psi import PSI +from bigdl.ppml.fl.nn.fl_context import init_fl_context +from bigdl.ppml.fl.psi.psi_client import PSI +fmt = '%(asctime)s %(levelname)s {%(module)s:%(lineno)d} - %(message)s' +logging.basicConfig(format=fmt, level=logging.INFO) class LocalModel(nn.Module): def __init__(self, num_feature) -> None: @@ -50,8 +54,10 @@ def forward(self, x: List[Tensor]): @click.command() @click.option('--load_model', default=False) -def run_client(load_model): - df_train = pd.read_csv('./data/diabetes-vfl-1.csv') +@click.option('--data_path', default="./data/diabetes-vfl-1.csv") +def run_client(load_model, data_path): + init_fl_context('1') + df_train = pd.read_csv(data_path) df_train['ID'] = df_train['ID'].astype(str) psi = PSI() @@ -69,11 +75,9 @@ def run_client(load_model): if load_model: model = torch.load('/tmp/pytorch_client_model_1.pt') ppl = Estimator.from_torch(client_model=model, - client_id='1', loss_fn=loss_fn, optimizer_cls=torch.optim.SGD, - optimizer_args={'lr':1e-5}, - target='localhost:8980', + optimizer_args={'lr':1e-4}, server_model_path='/tmp/pytorch_server_model', client_model_path='/tmp/pytorch_client_model_1.pt') ppl.load_server_model('/tmp/pytorch_server_model') @@ -83,11 +87,9 @@ def run_client(load_model): server_model = ServerModel() ppl = Estimator.from_torch(client_model=model, - client_id='1', loss_fn=loss_fn, optimizer_cls=torch.optim.SGD, - optimizer_args={'lr':1e-5}, - target='localhost:8980', + optimizer_args={'lr':1e-4}, server_model=server_model, server_model_path='/tmp/pytorch_server_model', client_model_path='/tmp/pytorch_client_model_1.pt') diff --git a/python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_2.py b/python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_2.py index 232d076e2b8..fca617612af 100644 --- a/python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_2.py +++ b/python/ppml/example/pytorch_nn_lr/pytorch_nn_lr_2.py @@ -14,6 +14,7 @@ # limitations under the License. # +import logging import numpy as np import pandas as pd import click @@ -21,8 +22,11 @@ import torch from torch import nn from bigdl.ppml.fl.estimator import Estimator -from bigdl.ppml.fl.psi.psi import PSI +from bigdl.ppml.fl.nn.fl_context import init_fl_context +from bigdl.ppml.fl.psi.psi_client import PSI +fmt = '%(asctime)s %(levelname)s {%(module)s:%(lineno)d} - %(message)s' +logging.basicConfig(format=fmt, level=logging.INFO) class LocalModel(nn.Module): def __init__(self, num_feature) -> None: @@ -36,9 +40,10 @@ def forward(self, x): @click.command() @click.option('--load_model', default=False) -def run_client(load_model): - df_train = pd.read_csv('./data/diabetes-vfl-2.csv') - +@click.option('--data_path', default="./data/diabetes-vfl-2.csv") +def run_client(load_model, data_path): + init_fl_context('2') + df_train = pd.read_csv(data_path) df_train['ID'] = df_train['ID'].astype(str) psi = PSI() intersection = psi.get_intersection(list(df_train['ID'])) @@ -53,21 +58,17 @@ def run_client(load_model): if load_model: model = torch.load('/tmp/pytorch_client_model_2.pt') ppl = Estimator.from_torch(client_model=model, - client_id='2', loss_fn=loss_fn, optimizer_cls=torch.optim.SGD, - optimizer_args={'lr':1e-5}, - target='localhost:8980', + optimizer_args={'lr':1e-4}, client_model_path='/tmp/pytorch_client_model_2.pt') response = ppl.fit(x, y, 5) else: model = LocalModel(len(df_x.columns)) ppl = Estimator.from_torch(client_model=model, - client_id='2', loss_fn=loss_fn, optimizer_cls=torch.optim.SGD, - optimizer_args={'lr':1e-5}, - target='localhost:8980', + optimizer_args={'lr':1e-4}, client_model_path='/tmp/pytorch_client_model_2.pt') response = ppl.fit(x, y, 5) result = ppl.predict(x) diff --git a/python/ppml/src/bigdl/ppml/fl/estimator.py b/python/ppml/src/bigdl/ppml/fl/estimator.py index 23d420f46aa..13f59296c82 100644 --- a/python/ppml/src/bigdl/ppml/fl/estimator.py +++ b/python/ppml/src/bigdl/ppml/fl/estimator.py @@ -30,11 +30,9 @@ class Estimator: @staticmethod def from_torch(client_model: nn.Module, - client_id, loss_fn, optimizer_cls, - optimizer_args={}, - target="localhost:8980", + optimizer_args={}, server_model=None, client_model_path=None, server_model_path=None): @@ -42,8 +40,6 @@ def from_torch(client_model: nn.Module, loss_fn=loss_fn, optimizer_cls=optimizer_cls, optimizer_args=optimizer_args, - client_id=client_id, - target=target, server_model=server_model, client_model_path=client_model_path, server_model_path=server_model_path) @@ -51,17 +47,13 @@ def from_torch(client_model: nn.Module, @staticmethod def from_keras(client_model: Model, - client_id, loss_fn, optimizer_cls, optimizer_args={}, - target="localhost:8980", server_model=None): estimator = TensorflowEstimator(model=client_model, loss_fn=loss_fn, optimizer_cls=optimizer_cls, optimizer_args=optimizer_args, - client_id=client_id, - target=target, server_model=server_model) return estimator diff --git a/python/ppml/src/bigdl/ppml/fl/nn/fl_client.py b/python/ppml/src/bigdl/ppml/fl/nn/fl_client.py index 24ffd387d2f..bc8bfb58434 100644 --- a/python/ppml/src/bigdl/ppml/fl/nn/fl_client.py +++ b/python/ppml/src/bigdl/ppml/fl/nn/fl_client.py @@ -16,16 +16,13 @@ import logging -import pickle import grpc from bigdl.ppml.fl.nn.generated.nn_service_pb2 import TrainRequest, PredictRequest, UploadMetaRequest from bigdl.ppml.fl.nn.generated.nn_service_pb2_grpc import * -from bigdl.ppml.fl.nn.utils import ndarray_map_to_tensor_map import yaml import threading from bigdl.dllib.utils.log4Error import invalidInputError -from bigdl.ppml.fl.nn.utils import ClassAndArgsWrapper class FLClient(object): channel = None @@ -52,64 +49,18 @@ def ensure_initialized(): else: FLClient.channel = grpc.insecure_channel(FLClient.target) - - def __init__(self, client_id, aggregator, target="localhost:8980") -> None: - self.secure = False - self.load_config() - with FLClient._lock: - if FLClient.channel == None: - if self.secure: - FLClient.channel = grpc.secure_channel(target, self.creds) - else: - FLClient.channel = grpc.insecure_channel(target) - self.nn_stub = NNServiceStub(FLClient.channel) - self.client_uuid = client_id - self.aggregator = aggregator - - def train(self, x): - tensor_map = ndarray_map_to_tensor_map(x) - train_request = TrainRequest(clientuuid=self.client_uuid, - data=tensor_map, - algorithm=self.aggregator) - - response = self.nn_stub.train(train_request) - if response.code == 1: - invalidInputError(False, - response.response) - return response - - def predict(self, x): - tensor_map = ndarray_map_to_tensor_map(x) - predict_request = PredictRequest(clientuuid=self.client_uuid, - data=tensor_map, - algorithm=self.aggregator) - - response = self.nn_stub.predict(predict_request) - if response.code == 1: - invalidInputError(False, - response.response) - return response - - def upload_meta(self, loss_fn, optimizer_cls, optimizer_args): - # upload model to server - loss_fn = pickle.dumps(loss_fn) - optimizer = ClassAndArgsWrapper(optimizer_cls, optimizer_args).to_protobuf() - request = UploadMetaRequest(client_uuid=self.client_uuid, - loss_fn=loss_fn, - optimizer=optimizer, - aggregator=self.aggregator) - return self.nn_stub.upload_meta(request) - - def load_config(self): + @staticmethod + def load_config(): try: with open('ppml-conf.yaml', 'r') as stream: conf = yaml.safe_load(stream) if 'privateKeyFilePath' in conf: - self.secure = True + FLClient.secure = True with open(conf['privateKeyFilePath'], 'rb') as f: - self.creds = grpc.ssl_channel_credentials(f.read()) + FLClient.creds = grpc.ssl_channel_credentials(f.read()) except yaml.YAMLError as e: logging.warn('Loading config failed, using default config ') except Exception as e: logging.warn('Failed to find config file "ppml-conf.yaml", using default config') + \ No newline at end of file diff --git a/python/ppml/src/bigdl/ppml/fl/nn/fl_context.py b/python/ppml/src/bigdl/ppml/fl/nn/fl_context.py index 815d297cb4b..4ba421de4fc 100644 --- a/python/ppml/src/bigdl/ppml/fl/nn/fl_context.py +++ b/python/ppml/src/bigdl/ppml/fl/nn/fl_context.py @@ -17,6 +17,8 @@ from ..nn.fl_client import FLClient def init_fl_context(client_id, target="localhost:8980"): + FLClient.load_config() FLClient.set_client_id(client_id) + # target can be set in config file, and also could be overwritten here FLClient.set_target(target) FLClient.ensure_initialized() diff --git a/python/ppml/src/bigdl/ppml/fl/nn/fl_server.py b/python/ppml/src/bigdl/ppml/fl/nn/fl_server.py index 209c81cf830..7a3c8827a97 100644 --- a/python/ppml/src/bigdl/ppml/fl/nn/fl_server.py +++ b/python/ppml/src/bigdl/ppml/fl/nn/fl_server.py @@ -20,14 +20,15 @@ from bigdl.ppml.fl.nn.nn_service import NNServiceImpl import yaml - +import click from bigdl.ppml.fl.psi.psi_service import PSIServiceImpl from bigdl.ppml.fl.nn.generated.nn_service_pb2_grpc import * from bigdl.ppml.fl.nn.generated.psi_service_pb2_grpc import * +# fmt = '%(asctime)s %(levelname)s {%(module)s:%(lineno)d} - %(message)s' +# logging.basicConfig(format=fmt, level=logging.DEBUG) - -class FLServer(object): +class FLServer(object): def __init__(self, client_num=None): self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=5)) self.port = 8980 @@ -86,9 +87,15 @@ def generate_conf(self, conf: dict): def wait_for_termination(self): self.server.wait_for_termination() - -if __name__ == '__main__': - fl_server = FLServer(2) + +@click.command() +@click.option('--client_num', default=1) +def run(client_num): + fl_server = FLServer(client_num) fl_server.build() fl_server.start() fl_server.wait_for_termination() + + +if __name__ == '__main__': + run() diff --git a/python/ppml/src/bigdl/ppml/fl/nn/nn_client.py b/python/ppml/src/bigdl/ppml/fl/nn/nn_client.py new file mode 100644 index 00000000000..36ce35f1da1 --- /dev/null +++ b/python/ppml/src/bigdl/ppml/fl/nn/nn_client.py @@ -0,0 +1,74 @@ +# +# Copyright 2016 The BigDL Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import pickle +from bigdl.ppml.fl.nn.fl_client import FLClient +from bigdl.ppml.fl.nn.generated.nn_service_pb2 import TrainRequest, PredictRequest, UploadMetaRequest +from bigdl.ppml.fl.nn.generated.nn_service_pb2_grpc import * +from bigdl.ppml.fl.nn.utils import ndarray_map_to_tensor_map +import threading +from bigdl.dllib.utils.log4Error import invalidInputError, invalidOperationError + +from bigdl.ppml.fl.nn.utils import ClassAndArgsWrapper + +class NNClient(object): + _lock = threading.Lock() + + def __init__(self, aggregator) -> None: + if FLClient.channel is None: + invalidOperationError(False, "No channel found, please make sure you called \ + init_fl_context()") + if FLClient.client_id is None: + invalidOperationError(False, "You have to set client_id with integer like: \ + init_fl_context(client_id=1)") + self.nn_stub = NNServiceStub(FLClient.channel) + self.client_uuid = FLClient.client_id + self.aggregator = aggregator + + def train(self, x): + tensor_map = ndarray_map_to_tensor_map(x) + train_request = TrainRequest(clientuuid=self.client_uuid, + data=tensor_map, + algorithm=self.aggregator) + + response = self.nn_stub.train(train_request) + if response.code == 1: + invalidInputError(False, + response.response) + return response + + def predict(self, x): + tensor_map = ndarray_map_to_tensor_map(x) + predict_request = PredictRequest(clientuuid=self.client_uuid, + data=tensor_map, + algorithm=self.aggregator) + + response = self.nn_stub.predict(predict_request) + if response.code == 1: + invalidInputError(False, + response.response) + return response + + def upload_meta(self, loss_fn, optimizer_cls, optimizer_args): + # upload model to server + loss_fn = pickle.dumps(loss_fn) + optimizer = ClassAndArgsWrapper(optimizer_cls, optimizer_args).to_protobuf() + request = UploadMetaRequest(client_uuid=self.client_uuid, + loss_fn=loss_fn, + optimizer=optimizer, + aggregator=self.aggregator) + return self.nn_stub.upload_meta(request) diff --git a/python/ppml/src/bigdl/ppml/fl/nn/pytorch/estimator.py b/python/ppml/src/bigdl/ppml/fl/nn/pytorch/estimator.py index 6d5ec7aa7c3..47a1e735d40 100644 --- a/python/ppml/src/bigdl/ppml/fl/nn/pytorch/estimator.py +++ b/python/ppml/src/bigdl/ppml/fl/nn/pytorch/estimator.py @@ -21,6 +21,7 @@ from bigdl.ppml.fl.nn.fl_client import FLClient from torch.utils.data import DataLoader from bigdl.dllib.utils.log4Error import invalidInputError, invalidOperationError +from bigdl.ppml.fl.nn.nn_client import NNClient from bigdl.ppml.fl.nn.utils import file_chunk_generate, tensor_map_to_ndarray_map import os import tempfile @@ -35,9 +36,7 @@ def __init__(self, loss_fn, optimizer_cls, optimizer_args, - client_id, - bigdl_type="float", - target="localhost:8980", + bigdl_type="float", fl_client=None, server_model=None, client_model_path=None, @@ -50,7 +49,7 @@ def __init__(self, self.client_model_path = client_model_path self.server_model_path = server_model_path self.fl_client = fl_client if fl_client is not None \ - else FLClient(client_id=client_id, aggregator='pt', target=target) + else NNClient(aggregator='pt') self.loss_history = [] if server_model is not None: self.__add_server_model(server_model, loss_fn, optimizer_cls, optimizer_args) diff --git a/python/ppml/src/bigdl/ppml/fl/nn/tensorflow/estimator.py b/python/ppml/src/bigdl/ppml/fl/nn/tensorflow/estimator.py index 90883c1e89b..4f0f1ab1da1 100644 --- a/python/ppml/src/bigdl/ppml/fl/nn/tensorflow/estimator.py +++ b/python/ppml/src/bigdl/ppml/fl/nn/tensorflow/estimator.py @@ -22,6 +22,7 @@ from bigdl.dllib.utils.log4Error import invalidInputError from bigdl.ppml.fl.nn.fl_client import FLClient +from bigdl.ppml.fl.nn.nn_client import NNClient from bigdl.ppml.fl.nn.utils import file_chunk_generate, print_file_size_in_dir, tensor_map_to_ndarray_map import tensorflow as tf @@ -35,9 +36,7 @@ def __init__(self, loss_fn, optimizer_cls, optimizer_args, - client_id, bigdl_type="float", - target="localhost:8980", fl_client=None, server_model=None): self.bigdl_type = bigdl_type @@ -46,7 +45,7 @@ def __init__(self, self.optimizer = optimizer_cls(**optimizer_args) self.version = 0 self.fl_client = fl_client if fl_client is not None \ - else FLClient(client_id=client_id, aggregator='tf', target=target) + else NNClient(aggregator='tf') self.loss_history = [] if server_model is not None: self.__add_server_model(server_model, loss_fn, optimizer_cls, optimizer_args) diff --git a/python/ppml/src/bigdl/ppml/fl/psi/psi.py b/python/ppml/src/bigdl/ppml/fl/psi/psi_client.py similarity index 89% rename from python/ppml/src/bigdl/ppml/fl/psi/psi.py rename to python/ppml/src/bigdl/ppml/fl/psi/psi_client.py index 946adb86bf5..02195fee306 100644 --- a/python/ppml/src/bigdl/ppml/fl/psi/psi.py +++ b/python/ppml/src/bigdl/ppml/fl/psi/psi_client.py @@ -15,6 +15,7 @@ # import logging +import time from bigdl.dllib.utils.log4Error import invalidOperationError from bigdl.ppml.fl.psi.utils import to_hex_string @@ -42,14 +43,17 @@ def download_intersection(self, max_try=100, retry=3): for i in range(max_try): intersection = self.stub.downloadIntersection( DownloadIntersectionRequest()).intersection - if intersection is not None: + if intersection is not None and len(intersection) != 0: hashed_intersection = list(intersection) logging.info(f"Intersection completed, size {len(intersection)}") intersection = [self.hashed_ids_to_ids[i] for i in hashed_intersection] return intersection + else: + logging.info(f"Got empty intersection, will retry in {retry} s... {i}/{max_try}") + time.sleep(retry) invalidOperationError(False, "Max retry reached, could not get intersection, exiting.") def get_intersection(self, ids, secure_code="", max_try=100, retry=3): - salt = self.stub.getSalt(SaltRequest(secure_code=secure_code)).salt_reply + salt = self.get_salt(secure_code) self.upload_set(ids, salt) return self.download_intersection(max_try, retry) diff --git a/python/ppml/src/bigdl/ppml/fl/psi/psi_intersection.py b/python/ppml/src/bigdl/ppml/fl/psi/psi_intersection.py index 064559840cb..0d85ccc98ed 100644 --- a/python/ppml/src/bigdl/ppml/fl/psi/psi_intersection.py +++ b/python/ppml/src/bigdl/ppml/fl/psi/psi_intersection.py @@ -40,12 +40,14 @@ def add_collection(self, collection): invalidOperationError(len(self.collection) < self.max_collection, f"PSI collection is full, got: {len(self.collection)}/{self.max_collection}") self.collection.append(collection) + logging.debug(f"PSI got collection {len(self.collection)}/{self.max_collection}") if len(self.collection) == self.max_collection: current_intersection = self.collection[0] for i in range(1, len(self.collection)): current_intersection = \ self.find_intersection(current_intersection, self.collection[i]) self.intersection = current_intersection + self.collection.clear() def get_intersection(self): with self._lock: diff --git a/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_logistic_regression.py b/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_logistic_regression.py index bd5461fc2f1..4f0a70b933d 100644 --- a/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_logistic_regression.py +++ b/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_logistic_regression.py @@ -14,6 +14,7 @@ # limitations under the License. # +import multiprocessing import threading from multiprocessing import Process import unittest @@ -22,7 +23,7 @@ import os import torch -from bigdl.ppml.fl.nn.fl_client import FLClient +from bigdl.ppml.fl.nn.fl_context import init_fl_context from bigdl.ppml.fl.nn.pytorch.utils import set_one_like_parameter from bigdl.ppml.fl.nn.fl_server import FLServer from torch import Tensor, nn @@ -33,10 +34,10 @@ from typing import List - resource_path = os.path.join(os.path.dirname(__file__), "../../resources") def mock_process(data_train, target, client_id, upload_server_model): + init_fl_context(client_id, target) # set new_fl_client to True will create a FLClient with new ID for multi-party test df_train = pd.read_csv(os.path.join(resource_path, data_train)) if 'Outcome' in df_train: @@ -54,11 +55,9 @@ def mock_process(data_train, target, client_id, upload_server_model): server_model = LogisticRegressionNetwork2() if upload_server_model else None logging.info("Creating FL Pytorch Estimator") ppl = Estimator.from_torch(client_model=model, - client_id=client_id, loss_fn=loss_fn, optimizer_cls=torch.optim.SGD, optimizer_args={'lr':1e-3}, - target=target, server_model=server_model) logging.info("Starting training") response = ppl.fit(x, y) @@ -69,7 +68,11 @@ def mock_process(data_train, target, client_id, upload_server_model): class TestLogisticRegression(FLTest): fmt = '%(asctime)s %(levelname)s {%(module)s:%(lineno)d} - %(message)s' - logging.basicConfig(format=fmt, level=logging.INFO) + logging.basicConfig(format=fmt, level=logging.DEBUG) + @classmethod + def setUpClass(cls) -> None: + multiprocessing.set_start_method('spawn') + def setUp(self) -> None: self.fl_server = FLServer(client_num=2) self.fl_server.set_port(self.port) @@ -117,7 +120,7 @@ def test_two_party_logistic_regression(self) -> None: print(f"loss: {loss:>7f} [{current:>3d}/{size:>3d}]") pytorch_loss_list.append(np.array(loss)) - mock_party2 = threading.Thread(target=mock_process, + mock_party2 = Process(target=mock_process, args=('diabetes-vfl-2.csv', self.target, '2', False)) mock_party2.start() ppl = mock_process(data_train='diabetes-vfl-1.csv', diff --git a/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_mnist.py b/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_mnist.py index 56f700af157..ec756075746 100644 --- a/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_mnist.py +++ b/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_mnist.py @@ -25,7 +25,7 @@ from bigdl.ppml.fl.nn.fl_server import FLServer from bigdl.ppml.fl.nn.fl_client import FLClient from bigdl.ppml.fl.nn.pytorch.utils import set_one_like_parameter -from bigdl.ppml.fl.utils import init_fl_context +from bigdl.ppml.fl.nn.fl_context import init_fl_context from bigdl.ppml.fl.estimator import Estimator from torch import Tensor, nn @@ -108,16 +108,15 @@ def train(dataloader, model, loss_fn, optimizer): train(train_dataloader, model, loss_fn, optimizer) + init_fl_context('1', self.target) vfl_model_1 = NeuralNetworkPart1() set_one_like_parameter(vfl_model_1) vfl_model_2 = NeuralNetworkPart2() set_one_like_parameter(vfl_model_2) vfl_client_ppl = Estimator.from_torch(client_model=vfl_model_1, - client_id="1", loss_fn=loss_fn, optimizer_cls=torch.optim.SGD, optimizer_args={'lr':1e-3}, - target=self.target, server_model=vfl_model_2) vfl_client_ppl.fit(train_dataloader) assert np.allclose(pytorch_loss_list, vfl_client_ppl.loss_history), \ diff --git a/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_protobuf_utils.py b/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_protobuf_utils.py deleted file mode 100644 index ce5d95ace61..00000000000 --- a/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_protobuf_utils.py +++ /dev/null @@ -1,71 +0,0 @@ -# -# Copyright 2016 The BigDL Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -from multiprocessing import Process -import unittest -import numpy as np -import pandas as pd -import os - -from bigdl.ppml.fl import * -from bigdl.ppml.fl.nn.fl_server import FLServer -from bigdl.ppml.fl.nn.fl_client import FLClient -import torch -from torch import nn -import torch.nn.functional as F - -from bigdl.ppml.fl.utils import FLTest - -resource_path = os.path.join(os.path.dirname(__file__), "../resources") - - -class TestProtobufUtils(FLTest): - fmt = '%(asctime)s %(levelname)s {%(module)s:%(lineno)d} - %(message)s' - logging.basicConfig(format=fmt, level=logging.DEBUG) - - def setUp(self) -> None: - self.fl_server = FLServer() - self.fl_server.set_port(self.port) - self.fl_server.build() - self.fl_server.start() - - def tearDown(self) -> None: - self.fl_server.stop() - - def test_upload_model(self) -> None: - cli = FLClient(client_id='1', aggregator='pt', target=self.target) - model = SimpleNNModel() - loss_fn = nn.BCELoss() - logging.debug('uploading model to server') - cli.upload_meta(loss_fn, torch.optim.SGD, {}) - - -class SimpleNNModel(nn.Module): - def __init__(self, input_features=8, hidden1=20, hidden2=10, out_features=2): - super().__init__() - self.fc1 = nn.Linear(input_features, hidden1) - self.fc2 = nn.Linear(hidden1, hidden2) - self.out = nn.Linear(hidden2, out_features) - - def forward(self,x): - x = F.relu(self.fc1(x)) - x = F.relu(self.fc2(x)) - x = self.out(x) - return x - -if __name__ == '__main__': - unittest.main() diff --git a/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_save_load.py b/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_save_load.py index 5411ce547a2..7d015971f3d 100644 --- a/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_save_load.py +++ b/python/ppml/test/bigdl/ppml/fl/nn/pytorch/test_save_load.py @@ -24,9 +24,7 @@ from bigdl.ppml.fl import * from bigdl.ppml.fl.nn.fl_server import FLServer -from bigdl.ppml.fl.nn.fl_client import FLClient -from bigdl.ppml.fl.nn.pytorch.utils import set_one_like_parameter -from bigdl.ppml.fl.utils import init_fl_context +from bigdl.ppml.fl.nn.fl_context import init_fl_context from bigdl.ppml.fl.estimator import Estimator from torch import Tensor, nn @@ -89,40 +87,16 @@ def test_mnist(self) -> None: print(f"Shape of y: {y.shape} {y.dtype}") break - model = NeuralNetwork() loss_fn = nn.CrossEntropyLoss() - optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) # list for result validation - pytorch_loss_list = [] - def train(dataloader, model, loss_fn, optimizer): - size = len(dataloader.dataset) - model.train() - for batch, (X, y) in enumerate(dataloader): - # Compute prediction error - pred = model(X) - loss = loss_fn(pred, y) - - # Backpropagation - optimizer.zero_grad() - loss.backward() - optimizer.step() - - if batch % 100 == 0: - loss, current = loss.item(), batch * len(X) - pytorch_loss_list.append(np.array(loss)) - print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]") - - # for i in range(2): - # train(train_dataloader, model, loss_fn, optimizer) + init_fl_context('1', self.target) vfl_model_1 = NeuralNetworkPart1() vfl_model_2 = NeuralNetworkPart2() vfl_client_ppl = Estimator.from_torch(client_model=vfl_model_1, - client_id="1", loss_fn=loss_fn, optimizer_cls=torch.optim.SGD, optimizer_args={'lr':1e-3}, - target=self.target, server_model=vfl_model_2, server_model_path=TestSaveLoad.server_model_path, client_model_path=TestSaveLoad.client_model_path) @@ -131,11 +105,9 @@ def train(dataloader, model, loss_fn, optimizer): self.setUp() client_model_loaded = torch.load(TestSaveLoad.client_model_path) ppl_from_file = Estimator.from_torch(client_model=client_model_loaded, - client_id="1", - loss_fn=loss_fn, - optimizer_cls=torch.optim.SGD, - optimizer_args={'lr':1e-3}, - target=self.target) + loss_fn=loss_fn, + optimizer_cls=torch.optim.SGD, + optimizer_args={'lr':1e-3}) ppl_from_file.load_server_model(TestSaveLoad.server_model_path) ppl_from_file.fit(train_dataloader) diff --git a/python/ppml/test/bigdl/ppml/fl/nn/tensorflow/test_mnist.py b/python/ppml/test/bigdl/ppml/fl/nn/tensorflow/test_mnist.py index b336d51275c..fa54e8387dc 100644 --- a/python/ppml/test/bigdl/ppml/fl/nn/tensorflow/test_mnist.py +++ b/python/ppml/test/bigdl/ppml/fl/nn/tensorflow/test_mnist.py @@ -22,9 +22,8 @@ from bigdl.ppml.fl import * from bigdl.ppml.fl.estimator import Estimator from bigdl.ppml.fl.nn.fl_server import FLServer -from bigdl.ppml.fl.nn.fl_client import FLClient from bigdl.ppml.fl.nn.tensorflow.utils import set_one_like_parameter -from bigdl.ppml.fl.utils import init_fl_context +from bigdl.ppml.fl.nn.fl_context import init_fl_context from bigdl.ppml.fl.nn.tensorflow.estimator import TensorflowEstimator import tensorflow as tf @@ -132,21 +131,16 @@ def test_step(images, labels): ) # TODO: set fixed parameters + init_fl_context('1', self.target) vfl_model_1 = build_client_model() set_one_like_parameter(vfl_model_1) vfl_model_2 = build_server_model() set_one_like_parameter(vfl_model_2) vfl_client_ppl = Estimator.from_keras(client_model=vfl_model_1, - client_id='1', loss_fn=loss_object, optimizer_cls=tf.keras.optimizers.Adam, optimizer_args={}, - target=self.target, server_model=vfl_model_2) - # vfl_client_ppl = TensorflowEstimator(vfl_model_1, loss_object, optimizer) - - - # vfl_client_ppl.add_server_model(vfl_model_2, loss_object, tf.keras.optimizers.Adam) vfl_client_ppl.fit(train_ds) assert np.allclose(tensorflow_loss_history, vfl_client_ppl.loss_history), \ diff --git a/python/ppml/test/bigdl/ppml/fl/psi/test_psi.py b/python/ppml/test/bigdl/ppml/fl/psi/test_psi.py index 6ef5fdb59fb..817cc9aae60 100644 --- a/python/ppml/test/bigdl/ppml/fl/psi/test_psi.py +++ b/python/ppml/test/bigdl/ppml/fl/psi/test_psi.py @@ -17,7 +17,7 @@ import unittest -from bigdl.ppml.fl.psi.psi import PSI +from bigdl.ppml.fl.psi.psi_client import PSI from bigdl.ppml.fl.nn.fl_server import FLServer from bigdl.ppml.fl.nn.fl_context import init_fl_context from bigdl.ppml.fl.utils import FLTest