Delta is a lib for writing privacy computing task executed on delta framework, uses Pytorch as it backend. It helps users focus on implementing computing models and logic, no need to worry about the data privacy (Delta framework do it for you).
First install pytorch
using pip:
pip install -U delta-task
install from source code:
git clone this repo
pip install .
Now lib delta supports horizontal federated learning task.
In delta, you can define a horizontal federated learning task by inherit the base class delta.task.HorizontalTask
HorizontalTask
is a abstract class, inspite of the constructor __init__
, you have to implement the following four methods:
- train(self, dataloader: Iterable)
- get_params(self) -> List[torch.Tensor]
- validate(self, dataloader: Iterable) -> Dict[str, float]
- preprocess(self, x, y=None)
def __init__(self, name: str, dataset: str, max_rounds: int, validate_interval: int = 1, validate_frac: float = 0.1):
When inherit HorizontalTask
, you must call the constructor of base class by super().__init__
params:
param | type | description |
---|---|---|
name | string | the task name |
dataset | string | the dataset which the task uses |
max_rounds | int | max execution rounds of task |
validate_interval | int | validation every {validate_interval} rounds |
validate_frac | float | validation dataset percentage, should be in [0, 1) |
def train(self, dataloader: Iterable)
You can implement model training logic in this function, a traditional implementation is like this:
def train(self, dataloader: Iterable):
x, y = batch
# forwarding
y_pred = self.model(x)
# loss calculation
loss = self.loss_func(y_pred, y)
# backwarding
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
params:
param | type | description |
---|---|---|
dataloader | Iterable | the dataloader of training dataset |
def get_params(self) -> List[torch.Tensor]
Get trainable model parameters. Delta framework will secure aggregate theses parameters from different nodes to get the global model parameters.
A traditional implementation is like this:
def get_params(self) -> List[torch.Tensor]:
return list(self.model.parameters())
returns:
type | description |
---|---|
List[torch.Tensor] | parameters list |
def validate(self, dataloader: Iterable) -> Dict[str, float]
You can implement the validation logic in this function.
The validation result is a Dict[str, float]
, whose key is the validation metric name, value is the vaidation metric value.
Delta framework will secure aggregate nodes' validation result to get the global validation result.
A traditional implementation is like this:
def validate(self, dataloader: Iterable) -> Dict[str, float]:
total_loss = 0
count = 0
ys = []
y_s = []
for batch in dataloader:
x, y = batch
y_pred = self.model(x)
loss = self.loss_func(y_pred, y)
total_loss += loss.item()
count += 1
y_ = torch.argmax(y_pred, dim=1)
y_s.extend(y_.tolist())
ys.extend(y.tolist())
avg_loss = total_loss / count
tp = len([1 for i in range(len(ys)) if ys[i] == y_s[i]])
precision = tp / len(ys)
return {"loss": avg_loss, "precision": precision}
In the above code, validate
function compute the average loss and precision on the input validation dataset.
params:
param | type | description |
---|---|---|
dataloader | Iterable | the dataloader of validation dataset |
def preprocess(self, x, y=None)
Preprocess function preprocess each data item before training and validation.
The input parameter x
is the data item, y
is the corresponding data label if the dataset has a label.
The type of x
depends on the dataset. It can be a np.ndarray
, a torch.Tensor
, a pd.DataFrame
or a Image.Image
.
In the preprocess function, you should convert the x
to a torch.Tensor
.
The type of y
is string (if has y
), means the label name, and you should convert it to a torch.Tensor
compatible with your loss function.
The preprocess function should return two torch.Tensor
if x
and y
all exist, or only return one torch.Tensor
.
def dataloader_config(self) -> Union[Dict[str, Any], Tuple[Dict[str, Any], Dict[str, Any]]]:
You can specify dataloader arguments in this function.
This function can return one or two dictionary as the kwargs passed to dataloader constructor. If it returns only one dictionary, then the result will be passed to both training dataloader and validation dataloader. If it returns two dictionary, then the first one will be passed to training dataloader, the second one will be passed to validation dataloader.
The default implementation is as the follows:
def dataloader_config(
self,
) -> Union[Dict[str, Any], Tuple[Dict[str, Any], Dict[str, Any]]]:
return {"shuffle": True, "batch_size": 64, "drop_last": True}
def algorithm(self) -> HorizontalAlgorithm:
You can specify secure aggregation algorithm in this function.
The algorithm you can use now is delta.algorithm.horizontal.FedAvg
and delta.algorithm.horizontal.FaultTolerantFedAvg
, which are subclasses of delta.algorithm.horizontal.HorizontalAlgorithm
.
The difference between FedAvg
and FaultTolerantFedAvg
is that FaultTolerantFedAvg
can tolerant some nodes being offline during secure aggregation, but FedAvg
can't.
Parameters of FedAvg
and FaultTolerantFedAvg
are the same.
params:
param | type | description |
---|---|---|
merge_interval_iter | int | aggregate every {merge_interval_iter} iter |
merge_interval_epoch | int | aggregate every {merge_interval_epoch} round. This param is mutually exclusive with merge_interval_iter, only one of them can be greater than 0 |
min_clients | int | minimal node count in a round |
max_clients | int | maximal node count in a round |
wait_timeout | Optional[float] | timeout in seconds for task computation in a round, default value is 60 second |
connection_timeout | Optional[float] | timeout in seconds for each stage of the secure aggregation, default value is 60 second |
precision | int | precision of the result. result will be rounded by this precision |
curve | CURVE_TYPE | elliptic curve used for asymmetric encryption in the secure aggregation, default value is "secp256k1" |
Delta framework now supports four kinds of dataset format:
- Numpy array, with file extension
.npy
,.npz
- Torch tensor, with file extension
.pt
- Table file, with file extension
.csv
,.tsv
,.xls
,.xlsx
. Table file will be loaded bypandas
- Image file. Image file will be loaded by
PIL
A dataset can be a single file or a directory. When creating a task, you can refer the dataset by its name.
When dataset is a single file, the first dimension of the data is the data sample, and each data sample has no label.
When dataset is a directory, there are two acceptable directory structure. The first one is that data files are placed in the directory. Each file represents a data sample and each data sample has no label. The second one is that directory contains several sub directories, and each sub directory represents a class of data. The sub directory name is the class name. Data files are placed in one sub directory according to its class.