diff --git a/delta/core/task.py b/delta/core/task.py index 1d34a96..d274b93 100644 --- a/delta/core/task.py +++ b/delta/core/task.py @@ -3,10 +3,11 @@ import abc from dataclasses import dataclass, field from enum import Enum -from typing import Any, Dict, List, Tuple, Union +from typing import Any, Dict, List, Tuple, Union, Set import networkx as nx import numpy as np +import numpy.typing as npt import pandas from .strategy import Strategy @@ -35,9 +36,14 @@ "TaskConstructer", "AggValueType", "AggResultType", + "EarlyStop", ] +class EarlyStop(Exception): + pass + + class DataLocation(str, Enum): CLIENT = "CLIENT" SERVER = "SERVER" @@ -216,7 +222,7 @@ def type(self) -> OpType: return OpType.REDUCE -AggValueType = Union[np.ndarray, pandas.DataFrame, pandas.Series] +AggValueType = Union[npt.NDArray, pandas.DataFrame, pandas.Series] AggResultType = Dict[str, AggValueType] @@ -463,18 +469,18 @@ class TaskConstructer(object): def build(constructor: TaskConstructer) -> Task: graph = nx.DiGraph() # bfs build graph - queue: List[GraphNode | Operator] = list(constructor.outputs) + queue: Set[GraphNode | Operator] = set(constructor.outputs) while len(queue): - next_queue = [] + next_queue = set() for node in queue: if isinstance(node, GraphNode): if node.src: graph.add_edge(node.src, node) - next_queue.append(node.src) + next_queue.add(node.src) elif isinstance(node, Operator): for src in node.inputs: graph.add_edge(src, node) - next_queue.append(src) + next_queue.add(src) queue = next_queue # name variables diff --git a/delta/debug.py b/delta/debug.py index 404e155..8d509c2 100644 --- a/delta/debug.py +++ b/delta/debug.py @@ -4,7 +4,7 @@ from .dataset import load_dataframe, load_dataset from .core.task import (ClientContext, DataFormat, DataLocation, DataNode, - InputGraphNode, ServerContext, Task) + InputGraphNode, ServerContext, Task, EarlyStop) class Aggregator(object): @@ -42,7 +42,7 @@ def get(self, *vars: DataNode) -> List[Any]: value = self._server_ctx.get(var)[0] if value is None: - raise ValueError(f"Cannot get var {var.name}") + raise KeyError(f"Cannot get var {var.name}") res.append(value) @@ -83,7 +83,7 @@ def get(self, *vars: DataNode) -> List[Any]: self._state[var.name] = value if value is None: - raise ValueError(f"Cannot get var {var.name}") + raise KeyError(f"Cannot get var {var.name}") res.append(value) @@ -117,7 +117,10 @@ def debug(task: Task, **data: Any): for step in task.steps: step.map(client_context) - step.reduce(server_context) + try: + step.reduce(server_context) + except EarlyStop: + break res = tuple(server_context.get(*task.outputs)) if len(res) == 1: diff --git a/delta/statsmodel/__init__.py b/delta/statsmodel/__init__.py new file mode 100644 index 0000000..de8d7fa --- /dev/null +++ b/delta/statsmodel/__init__.py @@ -0,0 +1,5 @@ +from .logit import LogitTask +from .mnlogit import MNLogitTask + + +__all__ = ["LogitTask", "MNLogitTask"] diff --git a/delta/statsmodel/logit.py b/delta/statsmodel/logit.py new file mode 100644 index 0000000..c50f671 --- /dev/null +++ b/delta/statsmodel/logit.py @@ -0,0 +1,238 @@ +from __future__ import annotations + +from abc import abstractmethod +from typing import Any, Callable, Dict, List, Tuple + +import numpy as np +import numpy.typing as npt +from delta.core.strategy import CURVE_TYPE, AnalyticsStrategy +from delta.core.task import ( + DataLocation, + DataNode, + GraphNode, + InputGraphNode, + MapOperator, + MapReduceOperator, +) +from delta.task import HorizontalTask + +from .optimizer import fit + +__all__ = ["LogitTask"] + +FloatArray = npt.NDArray[np.float_] +IntArray = npt.NDArray[np.int_] + + +def sigmoid(x: FloatArray) -> FloatArray: + return 1 / (1 + np.exp(-x)) + + +def logsigmoid(x: FloatArray) -> FloatArray: + return x - np.log(1 + np.exp(x)) + + +def check_params(params: FloatArray, x: FloatArray): + assert params.ndim == 1 + assert params.size == x.shape[1] + + +def loglike(params: FloatArray, y: IntArray, x: FloatArray): + p = (2 * y - 1) * np.dot(x, params) + return np.sum(logsigmoid(p)) + + +def loglikeobs(params: FloatArray, y: IntArray, x: FloatArray): + p = (2 * y - 1) * np.dot(x, params) + return logsigmoid(p) + + +def score(params: FloatArray, y: IntArray, x: FloatArray): + p = np.dot(x, params) + return np.dot(x.T, y - sigmoid(p)) + + +def score_obs(params: FloatArray, y: IntArray, x: FloatArray): + p = np.dot(x, params) + return (y - sigmoid(p)) * x + + +def hessian(params: FloatArray, y: IntArray, x: FloatArray): + p = np.dot(x, params) + l = sigmoid(p) + ll = l * (1 - l) + return -np.dot(x.T * ll, x) + + +class LogitTask(HorizontalTask): + def __init__( + self, + name: str, + min_clients: int = 2, + max_clients: int = 2, + wait_timeout: float = 60, + connection_timeout: float = 60, + precision: int = 8, + curve: CURVE_TYPE = "secp256k1", + ) -> None: + strategy = AnalyticsStrategy( + min_clients=min_clients, + max_clients=max_clients, + wait_timeout=wait_timeout, + connection_timeout=connection_timeout, + precision=precision, + curve=curve, + ) + super().__init__(name, strategy) + + @abstractmethod + def preprocess(self, **inputs: Any) -> Tuple[Any, Any]: + ... + + @abstractmethod + def dataset(self) -> Dict[str, InputGraphNode]: + ... + + def options(self) -> Dict[str, Any]: + return { + "maxiter": 35, + "method": "newton", + "ord": np.inf, + "tol": 1e-8, + "ridge_factor": 1e-10, + } + + def _fit( + self, + x_node: GraphNode, + y_node: GraphNode, + params_node: GraphNode, + method: str = "newton", + maxiter: int = 35, + **kwargs, + ): + def f(params, y, x): + return -loglike(params, y, x) + + def g(params, y, x): + return -score(params, y, x) + + def h(params, y, x): + return -hessian(params, y, x) + + opt_params, f_opt, iteration = fit( + f, + g, + x_node, + y_node, + params_node, + method, + hessian=h, + maxiter=maxiter, + **kwargs, + ) + return opt_params, f_opt, iteration + + def _build_graph(self) -> Tuple[List[InputGraphNode], List[GraphNode]]: + inputs = self.dataset() + input_nodes: List[InputGraphNode] = list(inputs.values()) + for name, node in inputs.items(): + node.name = name + + class Preprocess(MapOperator): + def __init__( + self, + name: str, + inputs: List[DataNode], + outputs: List[DataNode], + preprocess: Callable, + names: List[str], + ) -> None: + super().__init__( + name, inputs, outputs, preprocess=preprocess, names=names + ) + self.preprocess = preprocess + self.names = names + + def map(self, *args) -> Tuple[FloatArray, FloatArray]: + kwargs = dict(zip(self.names, args)) + x, y = self.preprocess(**kwargs) + x = np.asarray(x, dtype=np.float64) + assert x.ndim == 2, "x can only be in dim 2" + y = np.asarray(y, dtype=np.int8).squeeze() + assert x.shape[0] == y.shape[0], "x and y should have same items" + return x, y + + x_node = GraphNode( + name="x", + location=DataLocation.CLIENT, + ) + y_node = GraphNode( + name="y", + location=DataLocation.CLIENT, + ) + + preprocess_op = Preprocess( + name="preprocess", + inputs=list(inputs.values()), + outputs=[x_node, y_node], + preprocess=self.preprocess, + names=list(inputs.keys()), + ) + x_node.src = preprocess_op + y_node.src = preprocess_op + + options = self.options() + method = options.pop("method", "newton") + maxiter = options.pop("maxiter", 35) + start_params = options.pop("start_params", None) + + if start_params is None: + params_node = GraphNode(name="params", location=DataLocation.SERVER) + + class ParamsInitOp(MapReduceOperator): + def map(self, x: FloatArray, y: IntArray): + params = np.zeros((x.shape[1],), dtype=np.float64) + return {"params": params} + + def reduce(self, data, node_count: int): + params = data["params"] + return params + + params_init_op = ParamsInitOp( + name="params_init", + map_inputs=[x_node, y_node], + reduce_inputs=[], + map_outputs=[], + reduce_outputs=[params_node], + ) + params_node.src = params_init_op + else: + origin_params_node = InputGraphNode( + name="params", location=DataLocation.SERVER, default=start_params + ) + params_node = GraphNode(name="params", location=DataLocation.SERVER) + + class ParamsCheckOp(MapReduceOperator): + def map(self, params: FloatArray, x: FloatArray, y: IntArray): + assert check_params(params, x) + return {"params": params} + + def reduce(self, data, node_count: int): + params = data["params"] + return params + + params_check_op = ParamsCheckOp( + name="params_check", + map_inputs=[origin_params_node, x_node, y_node], + reduce_inputs=[], + map_outputs=[], + reduce_outputs=[params_node], + ) + params_node.src = params_check_op + + input_nodes.append(origin_params_node) + + outputs = self._fit(x_node, y_node, params_node, method, maxiter, **options) + + return input_nodes, list(outputs) diff --git a/delta/statsmodel/mnlogit.py b/delta/statsmodel/mnlogit.py new file mode 100644 index 0000000..ca7e822 --- /dev/null +++ b/delta/statsmodel/mnlogit.py @@ -0,0 +1,340 @@ +from abc import abstractmethod +from typing import Any, Tuple, Dict, List, Callable + +import numpy as np +import numpy.typing as npt +from delta.task import HorizontalTask +from delta.core.strategy import AnalyticsStrategy, CURVE_TYPE +from delta.core.task import ( + InputGraphNode, + GraphNode, + DataLocation, + DataNode, + MapOperator, + MapReduceOperator, +) + +from .optimizer import fit + +__all__ = ["MNLogitTask"] + +FloatArray = npt.NDArray[np.float_] +IntArray = npt.NDArray[np.int_] + + +def softmax(x: FloatArray): + x_ = x - np.max(x, axis=1, keepdims=True) + return np.exp(x_) / np.sum(np.exp(x_), axis=1, keepdims=True) + + +def logsoftmax(x: FloatArray): + x_ = x - np.max(x, axis=1, keepdims=True) + return x_ - np.log(np.sum(np.exp(x_), axis=1, keepdims=True)) + + +def convert_params(params: FloatArray, y: IntArray, x: FloatArray) -> FloatArray: + if params.shape[0] != x.shape[1]: + params = params.reshape(x.shape[1], -1, order="F") + assert params.shape[1] == y.shape[1] - 1 + return params + + +def loglike(params: FloatArray, y: IntArray, x: FloatArray): + params = convert_params(params, y, x) + p = np.column_stack((np.zeros(x.shape[0]), np.dot(x, params))) + return np.sum(y * logsoftmax(p)) + + +def loglikeobs(params: FloatArray, y: IntArray, x: FloatArray): + params = convert_params(params, y, x) + p = np.column_stack((np.zeros(x.shape[0]), np.dot(x, params))) + return y * logsoftmax(p) + + +def score(params: FloatArray, y: IntArray, x: FloatArray): + params = convert_params(params, y, x) + p = np.column_stack((np.zeros(x.shape[0]), np.dot(x, params))) + return np.dot(x.T, y - softmax(p))[:, 1:].ravel(order="F") + + +def score_obs(params: FloatArray, y: IntArray, x: FloatArray): + params = convert_params(params, y, x) + p = np.column_stack((np.zeros(x.shape[0]), np.dot(x, params))) + d = (y - softmax(p))[:, 1:] # shape [m, j - 1] + return (d[:, :, np.newaxis] * x[:, np.newaxis, :]).reshape(x.shape[0], -1) + + +def hessian(params: FloatArray, y: IntArray, x: FloatArray): + params = convert_params(params, y, x) + p = np.column_stack((np.zeros(x.shape[0]), np.dot(x, params))) + a = softmax(p) + + J = y.shape[1] + K = x.shape[1] + C = (J - 1) * K + partials = [] + for i in range(J - 1): + for j in range(J - 1): + if i == j: + partials.append( + np.dot( + ((a[:, i + 1] * (a[:, j + 1] - 1))[:, np.newaxis] * x).T, + x, + ) + ) + else: + partials.append( + np.dot( + ((a[:, i + 1] * a[:, j + 1])[:, np.newaxis] * x).T, + x, + ) + ) + + res = np.array(partials) + res = np.transpose(res.reshape(J - 1, J - 1, K, K), (0, 2, 1, 3)).reshape(C, C) + return res + + +class MNLogitTask(HorizontalTask): + def __init__( + self, + name: str, + min_clients: int = 2, + max_clients: int = 2, + wait_timeout: float = 60, + connection_timeout: float = 60, + precision: int = 8, + curve: CURVE_TYPE = "secp256k1", + max_n_classes: int = 100, + ) -> None: + strategy = AnalyticsStrategy( + min_clients=min_clients, + max_clients=max_clients, + wait_timeout=wait_timeout, + connection_timeout=connection_timeout, + precision=precision, + curve=curve, + ) + super().__init__(name, strategy) + self.max_n_classes = max_n_classes + + @abstractmethod + def preprocess(self, **inputs: Any) -> Tuple[Any, Any]: + ... + + @abstractmethod + def dataset(self) -> Dict[str, InputGraphNode]: + ... + + def options(self) -> Dict[str, Any]: + return { + "maxiter": 35, + "method": "newton", + "ord": np.inf, + "tol": 1e-8, + "ridge_factor": 1e-10, + } + + def _fit( + self, + x_node: GraphNode, + y_node: GraphNode, + params_node: GraphNode, + method: str = "newton", + maxiter: int = 35, + **kwargs, + ): + def f(params, y, x): + return -loglike(params, y, x) + + def g(params, y, x): + return -score(params, y, x) + + def h(params, y, x): + return -hessian(params, y, x) + + opt_params, f_opt, iteration = fit( + f, + g, + x_node, + y_node, + params_node, + method, + hessian=h, + maxiter=maxiter, + order="F", + **kwargs, + ) + return opt_params, f_opt, iteration + + def _build_graph(self) -> Tuple[List[InputGraphNode], List[GraphNode]]: + inputs = self.dataset() + input_nodes: List[InputGraphNode] = list(inputs.values()) + for name, node in inputs.items(): + node.name = name + + class Preprocess(MapOperator): + def __init__( + self, + name: str, + inputs: List[DataNode], + outputs: List[DataNode], + preprocess: Callable, + names: List[str], + ) -> None: + super().__init__( + name, inputs, outputs, preprocess=preprocess, names=names + ) + self.preprocess = preprocess + self.names = names + + def map(self, *args) -> Tuple[FloatArray, IntArray]: + kwargs = dict(zip(self.names, args)) + x, y = self.preprocess(**kwargs) + x = np.asarray(x, dtype=np.float64) + assert x.ndim == 2, "x can only be in dim 2" + y = np.asarray(y, dtype=np.int32) + assert x.shape[0] == y.shape[0], "x and y should have same items" + assert y.ndim == 1, "y should be an 1-D array" + return x, y + + x_node = GraphNode( + name="x", + location=DataLocation.CLIENT, + ) + origin_y_node = GraphNode( + name="origin_y", + location=DataLocation.CLIENT, + ) + + preprocess_op = Preprocess( + name="preprocess", + inputs=list(inputs.values()), + outputs=[x_node, origin_y_node], + preprocess=self.preprocess, + names=list(inputs.keys()), + ) + x_node.src = preprocess_op + origin_y_node.src = preprocess_op + + n_classes_node = GraphNode( + name="n_classes", + location=DataLocation.SERVER, + ) + + class NClassesOp(MapReduceOperator): + def __init__( + self, + name: str, + map_inputs: List[DataNode], + reduce_inputs: List[DataNode], + map_outputs: List[DataNode], + reduce_outputs: List[DataNode], + max_n_classes: int + ) -> None: + super().__init__( + name, + map_inputs, + reduce_inputs, + map_outputs, + reduce_outputs, + max_n_classes=max_n_classes, + ) + self.max_n_classes = max_n_classes + + def map(self, y: IntArray) -> Dict[str, IntArray]: + bins = np.zeros((self.max_n_classes,), dtype=np.int8) + for label in y: + bins[label] = 1 + return {"bins": bins} + + def reduce(self, data: Dict[str, IntArray], node_count: int): + bins = data["bins"] + assert bins[0] > 0, "y should start from 0" + max_label = 0 + for label in range(len(bins) - 1, -1, -1): + if bins[label] > 0: + max_label = label + break + return max_label + 1 + + n_classes_op = NClassesOp( + name="n_classes_op", + map_inputs=[origin_y_node], + reduce_inputs=[], + map_outputs=[], + reduce_outputs=[n_classes_node], + max_n_classes=self.max_n_classes + ) + n_classes_node.src = n_classes_op + + y_node = GraphNode(name="y", location=DataLocation.CLIENT) + + class OnehotOp(MapOperator): + def map(self, y: IntArray, n_classes: int) -> IntArray: + m = y.shape[0] + res = np.zeros((m, n_classes), dtype=np.int8) + res[np.arange(m), y] = 1 + return res + + one_hot_op = OnehotOp( + name="one_hot", + inputs=[origin_y_node, n_classes_node], + outputs=[y_node] + ) + y_node.src = one_hot_op + + options = self.options() + method = options.pop("method", "newton") + maxiter = options.pop("maxiter", 35) + start_params = options.pop("start_params", None) + + if start_params is None: + params_node = GraphNode(name="params", location=DataLocation.SERVER) + + class ParamsInitOp(MapReduceOperator): + def map(self, x: FloatArray, y: IntArray): + params = np.zeros((x.shape[1], y.shape[1] - 1), dtype=np.float64) + return {"params": params} + + def reduce(self, data, node_count: int): + params = data["params"] + return params + + params_init_op = ParamsInitOp( + name="params_init", + map_inputs=[x_node, y_node], + reduce_inputs=[], + map_outputs=[], + reduce_outputs=[params_node], + ) + params_node.src = params_init_op + else: + origin_params_node = InputGraphNode( + name="params", location=DataLocation.SERVER, default=start_params + ) + params_node = GraphNode(name="params", location=DataLocation.SERVER) + + class ParamsConvertOp(MapReduceOperator): + def map(self, params: FloatArray, x: FloatArray, y: IntArray): + params = convert_params(params, y, x) + return {"params": params} + + def reduce(self, data, node_count: int): + params = data["params"] + return params + + params_check_op = ParamsConvertOp( + name="params_check", + map_inputs=[origin_params_node, x_node, y_node], + reduce_inputs=[], + map_outputs=[], + reduce_outputs=[params_node], + ) + params_node.src = params_check_op + + input_nodes.append(origin_params_node) + + outputs = self._fit(x_node, y_node, params_node, method, maxiter, **options) + + return input_nodes, list(outputs) diff --git a/delta/statsmodel/optimizer.py b/delta/statsmodel/optimizer.py new file mode 100644 index 0000000..d54cb6f --- /dev/null +++ b/delta/statsmodel/optimizer.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +from typing import Callable, List, Dict, Tuple, Literal, Optional + +import numpy as np +import numpy.typing as npt + +from delta.core.task import ( + GraphNode, + DataNode, + MapReduceOperator, + DataLocation, + EarlyStop, + InputGraphNode, +) + +FloatArray = npt.NDArray[np.float_] +IntArray = npt.NDArray[np.int_] + +OrderACF = Optional[Literal["A", "C", "F"]] + +ModelFunc = Callable[[FloatArray, FloatArray, FloatArray], FloatArray] + + +def norm(v: FloatArray, ord: float = 2): + if ord == np.inf: + return np.max(v) + elif ord == -np.inf: + return np.min(v) + else: + return np.sum(np.abs(v) ** ord, axis=0) ** (1 / ord) + + +def newton_step( + f: ModelFunc, + score: ModelFunc, + hessian: ModelFunc, + x_node: GraphNode, + y_node: GraphNode, + params_node: GraphNode, + iteration_node: GraphNode, + order: OrderACF = None, + ord: float = np.inf, + tol: float = 1e-8, + ridge_factor: float = 1e-10, +): + new_f_val_node = GraphNode(name="f_val", location=DataLocation.SERVER) + new_params_node = GraphNode(name="params", location=DataLocation.SERVER) + new_iteration_node = GraphNode(name="iteration", location=DataLocation.SERVER) + + class NewtonStep(MapReduceOperator): + def __init__( + self, + name: str, + map_inputs: List[DataNode], + reduce_inputs: List[DataNode], + map_outputs: List[DataNode], + reduce_outputs: List[DataNode], + f: ModelFunc, + score: ModelFunc, + hessian: ModelFunc, + order: OrderACF = None, + ord: float = np.inf, + tol: float = 1e-8, + ridge_factor: float = 1e-10, + ) -> None: + super().__init__( + name, + map_inputs, + reduce_inputs, + map_outputs, + reduce_outputs, + f=f, + score=score, + hessian=hessian, + order=order, + ord=ord, + tol=tol, + ridge_factor=ridge_factor, + ) + self.f = f + self.score = score + self.hessian = hessian + self.order: OrderACF = order + self.ord = ord + self.tol = tol + self.ridge_factor = ridge_factor + + def map( + self, + x: FloatArray, + y: FloatArray, + params: FloatArray, + ) -> Dict[str, FloatArray]: + obs = x.shape[0] + F = self.f(params, y, x) + G = self.score(params, y, x) + H = self.hessian(params, y, x) + return { + "f": F, + "score": G, + "hessian": H, + "n": np.asarray(obs), + } + + def reduce( + self, + data: Dict[str, FloatArray], + node_count: int, + params: FloatArray, + iteration: int, + ): + n = data["n"] + f = data["f"] / n + score = data["score"] / n + hessian = data["hessian"] / n + + if norm(score, ord=self.ord) < self.tol: + raise EarlyStop + + if not np.all(self.ridge_factor == 0): + hessian[np.diag_indices_from(hessian)] += ridge_factor + + d: FloatArray = np.linalg.solve(hessian, score) + d = d.reshape(params.shape, order=self.order) + params = params - d + return params, f, iteration + 1 + + step_op = NewtonStep( + name="fit_newton_step", + map_inputs=[x_node, y_node, params_node], + reduce_inputs=[params_node, iteration_node], + map_outputs=[], + reduce_outputs=[new_params_node, new_f_val_node, new_iteration_node], + f=f, + score=score, + hessian=hessian, + ridge_factor=ridge_factor, + order=order, + ord=ord, + tol=tol, + ) + new_params_node.src = step_op + new_f_val_node.src = step_op + new_iteration_node.src = step_op + return new_params_node, new_f_val_node, new_iteration_node + + +def fit_newton( + f: ModelFunc, + score: ModelFunc, + hessian: ModelFunc, + x_node: GraphNode, + y_node: GraphNode, + start_params_node: GraphNode, + maxiter: int = 100, + order: OrderACF = None, + ord: float = np.inf, + tol: float = 1e-8, + ridge_factor: float = 1e-10, +) -> Tuple[GraphNode, GraphNode, GraphNode]: + params_node = start_params_node + f_val_node = InputGraphNode( + name="f_val", location=DataLocation.SERVER, default=np.inf + ) + iteration_node = InputGraphNode( + name="iteration", location=DataLocation.SERVER, default=0 + ) + for _ in range(maxiter): + params_node, f_val_node, iteration_node = newton_step( + f, + score, + hessian, + x_node, + y_node, + params_node, + iteration_node, + order, + ord, + tol, + ridge_factor, + ) + return params_node, f_val_node, iteration_node + + +def fit( + f: ModelFunc, + score: ModelFunc, + x_node: GraphNode, + y_node: GraphNode, + start_params_node: GraphNode, + method: str = "newton", + hessian: ModelFunc | None = None, + maxiter: int = 100, + order: OrderACF = None, + ord: float = np.inf, + tol: float = 1e-8, + ridge_factor: float = 1e-10, +) -> Tuple[GraphNode, GraphNode, GraphNode]: + if method == "newton": + assert hessian is not None + return fit_newton( + f, + score, + hessian, + x_node, + y_node, + start_params_node, + maxiter, + order, + ord, + tol, + ridge_factor, + ) + else: + raise ValueError(f"unknown method {method}") diff --git a/delta/task/__init__.py b/delta/task/__init__.py index 9aa0303..c54bd92 100644 --- a/delta/task/__init__.py +++ b/delta/task/__init__.py @@ -1,5 +1,11 @@ from .analytics import HorizontalAnalytics from .learning import HorizontalLearning, FaultTolerantFedAvg, FedAvg +from .task import HorizontalTask - -__all__ = ["HorizontalAnalytics", "HorizontalLearning", "FaultTolerantFedAvg", "FedAvg"] \ No newline at end of file +__all__ = [ + "HorizontalTask", + "HorizontalAnalytics", + "HorizontalLearning", + "FaultTolerantFedAvg", + "FedAvg", +] diff --git a/logit_example.py b/logit_example.py new file mode 100644 index 0000000..adb6ad6 --- /dev/null +++ b/logit_example.py @@ -0,0 +1,68 @@ +from typing import Any, Tuple + +import pandas +import delta.dataset +from delta import DeltaNode +from delta.statsmodel import LogitTask, MNLogitTask + + +class SpectorLogitTask(LogitTask): + def __init__( + self, + ) -> None: + super().__init__( + name="spector_logit", # The task name which is used for displaying purpose. + min_clients=2, # Minimum nodes required in each round, must be greater than 2. + max_clients=3, # Maximum nodes allowed in each round, must be greater equal than min_clients. + wait_timeout=5, # Timeout for calculation. + connection_timeout=5, # Wait timeout for each step. + ) + + def dataset(self): + return { + "data": delta.dataset.DataFrame("spector.csv"), + } + + def preprocess(self, data: pandas.DataFrame) -> Tuple[Any, Any]: + names = data.columns + + y_name = names[3] + y = data[y_name].copy() # type: ignore + x = data.drop([y_name], axis=1) + return x, y + + +class IrisLogitTask(MNLogitTask): + def __init__( + self, + ) -> None: + super().__init__( + name="spector_logit", # The task name which is used for displaying purpose. + min_clients=2, # Minimum nodes required in each round, must be greater than 2. + max_clients=3, # Maximum nodes allowed in each round, must be greater equal than min_clients. + wait_timeout=5, # Timeout for calculation. + connection_timeout=5, # Wait timeout for each step. + ) + + def dataset(self): + return {"data": delta.dataset.DataFrame("iris.csv")} + + def preprocess(self, data: pandas.DataFrame) -> Tuple[Any, Any]: + y = data["target"].copy() + x = data.drop(["target"], axis=1) + return x, y + + +if __name__ == "__main__": + # task = SpectorLogitTask().build() + task = IrisLogitTask().build() + + DELTA_NODE_API = "http://127.0.0.1:6700" + + delta_node = DeltaNode(DELTA_NODE_API) + task_id = delta_node.create_task(task) + if delta_node.trace(task_id): + res = delta_node.get_result(task_id) + print(res) + else: + print("Task error") diff --git a/requirements.txt b/requirements.txt index 1ca50de..2e41c39 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ cloudpickle==1.6.0 -httpx==0.21.1 -numpy==1.21.4 +httpx==0.23.0 +numpy==1.22.0 Pillow==9.1.1 pandas==1.2.3 pytest==6.2.5 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..3dc875f --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[mypy] +plugins = numpy.typing.mypy_plugin diff --git a/setup.py b/setup.py index af9f429..46c2631 100644 --- a/setup.py +++ b/setup.py @@ -30,15 +30,15 @@ def run_tests(self): setup( name="delta-task", - version="0.5.3", + version="0.6.0rc1", license_files=("LICENSE"), packages=find_packages(), include_package_data=True, exclude_package_data={"": [".gitignore"]}, install_requires=[ "cloudpickle==1.6.0", - "httpx==0.21.1", - "numpy==1.21.4", + "httpx==0.23.0", + "numpy==1.22.0", "Pillow==9.1.1", "pandas==1.2.3", "pytest==6.2.5",