From db9db0420ca487a0b13fd9b52bf09f923c1c6222 Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Tue, 9 Nov 2021 14:20:13 -0500 Subject: [PATCH] Migrate additional examples from xgboost-operator Signed-off-by: terrytangyuan --- examples/xgboost/lightgbm-dist/Dockerfile | 44 ++ examples/xgboost/lightgbm-dist/README.md | 210 ++++++++ examples/xgboost/lightgbm-dist/main.py | 80 +++ examples/xgboost/lightgbm-dist/train.py | 26 + examples/xgboost/lightgbm-dist/utils.py | 91 ++++ .../xgboostjob_v1_lightgbm_dist_training.yaml | 75 +++ examples/xgboost/smoke-dist/Dockerfile | 26 + examples/xgboost/smoke-dist/README.md | 99 ++++ examples/xgboost/smoke-dist/requirements.txt | 5 + examples/xgboost/smoke-dist/tracker.py | 506 ++++++++++++++++++ .../xgboost/smoke-dist/xgboost_smoke_test.py | 108 ++++ .../smoke-dist/xgboostjob_v1_rabit_test.yaml | 31 ++ .../xgboostjob_v1alpha1_rabit_test.yaml | 35 ++ examples/xgboost/xgboost-dist/Dockerfile | 25 + examples/xgboost/xgboost-dist/README.md | 506 ++++++++++++++++++ examples/xgboost/xgboost-dist/build.sh | 11 + examples/xgboost/xgboost-dist/local_test.py | 96 ++++ examples/xgboost/xgboost-dist/main.py | 94 ++++ examples/xgboost/xgboost-dist/predict.py | 40 ++ .../xgboost/xgboost-dist/requirements.txt | 9 + examples/xgboost/xgboost-dist/tracker.py | 506 ++++++++++++++++++ examples/xgboost/xgboost-dist/train.py | 91 ++++ examples/xgboost/xgboost-dist/utils.py | 291 ++++++++++ .../xgboostjob_v1_iris_predict.yaml | 42 ++ .../xgboostjob_v1_iris_train.yaml | 44 ++ .../xgboostjob_v1alpha1_iris_predict.yaml | 46 ++ ...gboostjob_v1alpha1_iris_predict_local.yaml | 56 ++ .../xgboostjob_v1alpha1_iris_train.yaml | 49 ++ .../xgboostjob_v1alpha1_iris_train_local.yaml | 62 +++ 29 files changed, 3304 insertions(+) create mode 100644 examples/xgboost/lightgbm-dist/Dockerfile create mode 100644 examples/xgboost/lightgbm-dist/README.md create mode 100644 examples/xgboost/lightgbm-dist/main.py create mode 100644 examples/xgboost/lightgbm-dist/train.py create mode 100644 examples/xgboost/lightgbm-dist/utils.py create mode 100644 examples/xgboost/lightgbm-dist/xgboostjob_v1_lightgbm_dist_training.yaml create mode 100644 examples/xgboost/smoke-dist/Dockerfile create mode 100644 examples/xgboost/smoke-dist/README.md create mode 100644 examples/xgboost/smoke-dist/requirements.txt create mode 100644 examples/xgboost/smoke-dist/tracker.py create mode 100644 examples/xgboost/smoke-dist/xgboost_smoke_test.py create mode 100644 examples/xgboost/smoke-dist/xgboostjob_v1_rabit_test.yaml create mode 100644 examples/xgboost/smoke-dist/xgboostjob_v1alpha1_rabit_test.yaml create mode 100644 examples/xgboost/xgboost-dist/Dockerfile create mode 100644 examples/xgboost/xgboost-dist/README.md create mode 100644 examples/xgboost/xgboost-dist/build.sh create mode 100644 examples/xgboost/xgboost-dist/local_test.py create mode 100644 examples/xgboost/xgboost-dist/main.py create mode 100644 examples/xgboost/xgboost-dist/predict.py create mode 100644 examples/xgboost/xgboost-dist/requirements.txt create mode 100644 examples/xgboost/xgboost-dist/tracker.py create mode 100644 examples/xgboost/xgboost-dist/train.py create mode 100644 examples/xgboost/xgboost-dist/utils.py create mode 100644 examples/xgboost/xgboost-dist/xgboostjob_v1_iris_predict.yaml create mode 100644 examples/xgboost/xgboost-dist/xgboostjob_v1_iris_train.yaml create mode 100644 examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_predict.yaml create mode 100644 examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_predict_local.yaml create mode 100644 examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_train.yaml create mode 100644 examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_train_local.yaml diff --git a/examples/xgboost/lightgbm-dist/Dockerfile b/examples/xgboost/lightgbm-dist/Dockerfile new file mode 100644 index 0000000000..cd8a36bb93 --- /dev/null +++ b/examples/xgboost/lightgbm-dist/Dockerfile @@ -0,0 +1,44 @@ +FROM ubuntu:16.04 + +ARG CONDA_DIR=/opt/conda +ENV PATH $CONDA_DIR/bin:$PATH + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + cmake \ + build-essential \ + gcc \ + g++ \ + git \ + curl && \ + # python environment + curl -sL https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -o conda.sh && \ + /bin/bash conda.sh -f -b -p $CONDA_DIR && \ + export PATH="$CONDA_DIR/bin:$PATH" && \ + conda config --set always_yes yes --set changeps1 no && \ + # lightgbm + conda install -q -y numpy==1.20.3 scipy==1.6.2 scikit-learn==0.24.2 pandas==1.3.0 && \ + git clone --recursive --branch stable --depth 1 https://github.com/Microsoft/LightGBM && \ + mkdir LightGBM/build && \ + cd LightGBM/build && \ + cmake .. && \ + make -j4 && \ + make install && \ + cd ../python-package && \ + python setup.py install_lib && \ + # clean + apt-get autoremove -y && apt-get clean && \ + conda clean -a -y && \ + rm -rf /usr/local/src/* && \ + rm -rf /LightGBM + +WORKDIR /app + +# Download the example data +RUN mkdir data +ADD https://raw.githubusercontent.com/microsoft/LightGBM/stable/examples/parallel_learning/binary.train data/. +ADD https://raw.githubusercontent.com/microsoft/LightGBM/stable/examples/parallel_learning/binary.test data/. +COPY *.py ./ + +ENTRYPOINT [ "python", "/app/main.py" ] \ No newline at end of file diff --git a/examples/xgboost/lightgbm-dist/README.md b/examples/xgboost/lightgbm-dist/README.md new file mode 100644 index 0000000000..f8511447e1 --- /dev/null +++ b/examples/xgboost/lightgbm-dist/README.md @@ -0,0 +1,210 @@ +### Distributed Lightgbm Job train + +This folder containers Dockerfile and Python scripts to run a distributed Lightgbm training using the XGBoost operator. +The code is based in this [example](https://github.com/microsoft/LightGBM/tree/master/examples/parallel_learning) in the official github repository of the library. + + +**Build image** +The default image name and tag is `kubeflow/lightgbm-dist-py-test:1.0` respectiveily. + +```shell +docker build -f Dockerfile -t kubeflow/lightgbm-dist-py-test:1.0 ./ +``` + +**Start the training** + +``` +kubectl create -f xgboostjob_v1_lightgbm_dist_training.yaml +``` + +**Look at the job status** +``` + kubectl get -o yaml XGBoostJob/lightgbm-dist-train-test + ``` +Here is sample output when the job is running. The output result like this + +``` +apiVersion: xgboostjob.kubeflow.org/v1 +kind: XGBoostJob +metadata: + annotations: + kubectl.kubernetes.io/last-applied-configuration: | + {"apiVersion":"xgboostjob.kubeflow.org/v1","kind":"XGBoostJob","metadata":{"annotations":{},"name":"lightgbm-dist-train-test","namespace":"default"},"spec":{"xgbReplicaSpecs":{"Master":{"replicas":1,"restartPolicy":"Never","template":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"args":["--job_type=Train","--boosting_type=gbdt","--objective=binary","--metric=binary_logloss,auc","--metric_freq=1","--is_training_metric=true","--max_bin=255","--data=data/binary.train","--valid_data=data/binary.test","--num_trees=100","--learning_rate=01","--num_leaves=63","--tree_learner=feature","--feature_fraction=0.8","--bagging_freq=5","--bagging_fraction=0.8","--min_data_in_leaf=50","--min_sum_hessian_in_leaf=50","--is_enable_sparse=true","--use_two_round_loading=false","--is_save_binary_file=false"],"image":"kubeflow/lightgbm-dist-py-test:1.0","imagePullPolicy":"Never","name":"xgboostjob","ports":[{"containerPort":9991,"name":"xgboostjob-port"}]}]}}},"Worker":{"replicas":2,"restartPolicy":"ExitCode","template":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"args":["--job_type=Train","--boosting_type=gbdt","--objective=binary","--metric=binary_logloss,auc","--metric_freq=1","--is_training_metric=true","--max_bin=255","--data=data/binary.train","--valid_data=data/binary.test","--num_trees=100","--learning_rate=01","--num_leaves=63","--tree_learner=feature","--feature_fraction=0.8","--bagging_freq=5","--bagging_fraction=0.8","--min_data_in_leaf=50","--min_sum_hessian_in_leaf=50","--is_enable_sparse=true","--use_two_round_loading=false","--is_save_binary_file=false"],"image":"kubeflow/lightgbm-dist-py-test:1.0","imagePullPolicy":"Never","name":"xgboostjob","ports":[{"containerPort":9991,"name":"xgboostjob-port"}]}]}}}}}} + creationTimestamp: "2020-10-14T15:31:23Z" + generation: 7 + managedFields: + - apiVersion: xgboostjob.kubeflow.org/v1 + fieldsType: FieldsV1 + fieldsV1: + f:metadata: + f:annotations: + .: {} + f:kubectl.kubernetes.io/last-applied-configuration: {} + f:spec: + .: {} + f:xgbReplicaSpecs: + .: {} + f:Master: + .: {} + f:replicas: {} + f:restartPolicy: {} + f:template: + .: {} + f:spec: {} + f:Worker: + .: {} + f:replicas: {} + f:restartPolicy: {} + f:template: + .: {} + f:spec: {} + manager: kubectl-client-side-apply + operation: Update + time: "2020-10-14T15:31:23Z" + - apiVersion: xgboostjob.kubeflow.org/v1 + fieldsType: FieldsV1 + fieldsV1: + f:spec: + f:RunPolicy: + .: {} + f:cleanPodPolicy: {} + f:xgbReplicaSpecs: + f:Master: + f:template: + f:metadata: + .: {} + f:creationTimestamp: {} + f:spec: + f:containers: {} + f:Worker: + f:template: + f:metadata: + .: {} + f:creationTimestamp: {} + f:spec: + f:containers: {} + f:status: + .: {} + f:completionTime: {} + f:conditions: {} + f:replicaStatuses: + .: {} + f:Master: + .: {} + f:succeeded: {} + f:Worker: + .: {} + f:succeeded: {} + manager: main + operation: Update + time: "2020-10-14T15:34:44Z" + name: lightgbm-dist-train-test + namespace: default + resourceVersion: "38923" + selfLink: /apis/xgboostjob.kubeflow.org/v1/namespaces/default/xgboostjobs/lightgbm-dist-train-test + uid: b2b887d0-445b-498b-8852-26c8edc98dc7 +spec: + RunPolicy: + cleanPodPolicy: None + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + metadata: + creationTimestamp: null + spec: + containers: + - args: + - --job_type=Train + - --boosting_type=gbdt + - --objective=binary + - --metric=binary_logloss,auc + - --metric_freq=1 + - --is_training_metric=true + - --max_bin=255 + - --data=data/binary.train + - --valid_data=data/binary.test + - --num_trees=100 + - --learning_rate=01 + - --num_leaves=63 + - --tree_learner=feature + - --feature_fraction=0.8 + - --bagging_freq=5 + - --bagging_fraction=0.8 + - --min_data_in_leaf=50 + - --min_sum_hessian_in_leaf=50 + - --is_enable_sparse=true + - --use_two_round_loading=false + - --is_save_binary_file=false + image: kubeflow/lightgbm-dist-py-test:1.0 + imagePullPolicy: Never + name: xgboostjob + ports: + - containerPort: 9991 + name: xgboostjob-port + resources: {} + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + metadata: + creationTimestamp: null + spec: + containers: + - args: + - --job_type=Train + - --boosting_type=gbdt + - --objective=binary + - --metric=binary_logloss,auc + - --metric_freq=1 + - --is_training_metric=true + - --max_bin=255 + - --data=data/binary.train + - --valid_data=data/binary.test + - --num_trees=100 + - --learning_rate=01 + - --num_leaves=63 + - --tree_learner=feature + - --feature_fraction=0.8 + - --bagging_freq=5 + - --bagging_fraction=0.8 + - --min_data_in_leaf=50 + - --min_sum_hessian_in_leaf=50 + - --is_enable_sparse=true + - --use_two_round_loading=false + - --is_save_binary_file=false + image: kubeflow/lightgbm-dist-py-test:1.0 + imagePullPolicy: Never + name: xgboostjob + ports: + - containerPort: 9991 + name: xgboostjob-port + resources: {} +status: + completionTime: "2020-10-14T15:34:44Z" + conditions: + - lastTransitionTime: "2020-10-14T15:31:23Z" + lastUpdateTime: "2020-10-14T15:31:23Z" + message: xgboostJob lightgbm-dist-train-test is created. + reason: XGBoostJobCreated + status: "True" + type: Created + - lastTransitionTime: "2020-10-14T15:31:23Z" + lastUpdateTime: "2020-10-14T15:31:23Z" + message: XGBoostJob lightgbm-dist-train-test is running. + reason: XGBoostJobRunning + status: "False" + type: Running + - lastTransitionTime: "2020-10-14T15:34:44Z" + lastUpdateTime: "2020-10-14T15:34:44Z" + message: XGBoostJob lightgbm-dist-train-test is successfully completed. + reason: XGBoostJobSucceeded + status: "True" + type: Succeeded + replicaStatuses: + Master: + succeeded: 1 + Worker: + succeeded: 2 +``` \ No newline at end of file diff --git a/examples/xgboost/lightgbm-dist/main.py b/examples/xgboost/lightgbm-dist/main.py new file mode 100644 index 0000000000..c8c526428b --- /dev/null +++ b/examples/xgboost/lightgbm-dist/main.py @@ -0,0 +1,80 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import logging +import argparse + +from train import train + +from utils import generate_machine_list_file, generate_train_conf_file + + +logger = logging.getLogger(__name__) + + +def main(args, extra_args): + + master_addr = os.environ["MASTER_ADDR"] + master_port = os.environ["MASTER_PORT"] + worker_addrs = os.environ["WORKER_ADDRS"] + worker_port = os.environ["WORKER_PORT"] + world_size = int(os.environ["WORLD_SIZE"]) + rank = int(os.environ["RANK"]) + + logger.info( + "extract cluster info from env variables \n" + f"master_addr: {master_addr} \n" + f"master_port: {master_port} \n" + f"worker_addrs: {worker_addrs} \n" + f"worker_port: {worker_port} \n" + f"world_size: {world_size} \n" + f"rank: {rank} \n" + ) + + if args.job_type == "Predict": + logging.info("starting the predict job") + + elif args.job_type == "Train": + logging.info("starting the train job") + logging.info(f"extra args:\n {extra_args}") + machine_list_filepath = generate_machine_list_file( + master_addr, master_port, worker_addrs, worker_port + ) + logging.info(f"machine list generated in: {machine_list_filepath}") + local_port = worker_port if rank else master_port + config_file = generate_train_conf_file( + machine_list_file=machine_list_filepath, + world_size=world_size, + output_model="model.txt", + local_port=local_port, + extra_args=extra_args, + ) + logging.info(f"config generated in: {config_file}") + train(config_file) + logging.info("Finish distributed job") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + parser.add_argument( + "--job_type", + help="Job type to execute", + choices=["Train", "Predict"], + required=True, + ) + + logging.basicConfig(format="%(message)s") + logging.getLogger().setLevel(logging.INFO) + args, extra_args = parser.parse_known_args() + main(args, extra_args) diff --git a/examples/xgboost/lightgbm-dist/train.py b/examples/xgboost/lightgbm-dist/train.py new file mode 100644 index 0000000000..578b6fc45e --- /dev/null +++ b/examples/xgboost/lightgbm-dist/train.py @@ -0,0 +1,26 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import subprocess + +logger = logging.getLogger(__name__) + + +def train(train_config_filepath: str): + cmd = ["lightgbm", f"config={train_config_filepath}"] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) + line = proc.stdout.readline() + while line: + logger.info((line.decode("utf-8").strip())) + line = proc.stdout.readline() diff --git a/examples/xgboost/lightgbm-dist/utils.py b/examples/xgboost/lightgbm-dist/utils.py new file mode 100644 index 0000000000..86101455d3 --- /dev/null +++ b/examples/xgboost/lightgbm-dist/utils.py @@ -0,0 +1,91 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +import socket +import logging +import tempfile +from time import sleep +from typing import List, Union + +logger = logging.getLogger(__name__) + + +def generate_machine_list_file( + master_addr: str, master_port: str, worker_addrs: str, worker_port: str +) -> str: + logger.info("starting to extract system env") + + filename = tempfile.NamedTemporaryFile(delete=False).name + + def _get_ips( + master_addr_name, + worker_addr_names, + max_retries=10, + sleep_secs=10, + current_retry=0, + ): + try: + worker_addr_ips = [] + master_addr_ip = socket.gethostbyname(master_addr_name) + + for addr in worker_addr_names.split(","): + worker_addr_ips.append(socket.gethostbyname(addr)) + + except socket.gaierror as ex: + if "Name or service not known" in str(ex) and current_retry < max_retries: + sleep(sleep_secs) + master_addr_ip, worker_addr_ips = _get_ips( + master_addr_name, + worker_addr_names, + max_retries=max_retries, + sleep_secs=sleep_secs, + current_retry=current_retry + 1, + ) + else: + raise ValueError("Couldn't get address names") + + return master_addr_ip, worker_addr_ips + + master_ip, worker_ips = _get_ips(master_addr, worker_addrs) + + with open(filename, "w") as file: + print(f"{master_ip} {master_port}", file=file) + for addr in worker_ips: + print(f"{addr} {worker_port}", file=file) + + return filename + + +def generate_train_conf_file( + machine_list_file: str, + world_size: int, + output_model: str, + local_port: Union[int, str], + extra_args: List[str], +) -> str: + + filename = tempfile.NamedTemporaryFile(delete=False).name + + with open(filename, "w") as file: + print("task = train", file=file) + print(f"output_model = {output_model}", file=file) + print(f"num_machines = {world_size}", file=file) + print(f"local_listen_port = {local_port}", file=file) + print(f"machine_list_file = {machine_list_file}", file=file) + for arg in extra_args: + m = re.match(r"--(.+)=([^\s]+)", arg) + if m is not None: + k, v = m.groups() + print(f"{k} = {v}", file=file) + + return filename diff --git a/examples/xgboost/lightgbm-dist/xgboostjob_v1_lightgbm_dist_training.yaml b/examples/xgboost/lightgbm-dist/xgboostjob_v1_lightgbm_dist_training.yaml new file mode 100644 index 0000000000..75bf7121a3 --- /dev/null +++ b/examples/xgboost/lightgbm-dist/xgboostjob_v1_lightgbm_dist_training.yaml @@ -0,0 +1,75 @@ +apiVersion: kubeflow.org/v1 +kind: XGBoostJob +metadata: + name: lightgbm-dist-train-test +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: xgboostjob + image: kubeflow/lightgbm-dist-py-test:1.0 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Never + args: + - --job_type=Train + - --boosting_type=gbdt + - --objective=binary + - --metric=binary_logloss,auc + - --metric_freq=1 + - --is_training_metric=true + - --max_bin=255 + - --data=data/binary.train + - --valid_data=data/binary.test + - --num_trees=100 + - --learning_rate=01 + - --num_leaves=63 + - --tree_learner=feature + - --feature_fraction=0.8 + - --bagging_freq=5 + - --bagging_fraction=0.8 + - --min_data_in_leaf=50 + - --min_sum_hessian_in_leaf=50 + - --is_enable_sparse=true + - --use_two_round_loading=false + - --is_save_binary_file=false + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + spec: + containers: + - name: xgboostjob + image: kubeflow/lightgbm-dist-py-test:1.0 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Never + args: + - --job_type=Train + - --boosting_type=gbdt + - --objective=binary + - --metric=binary_logloss,auc + - --metric_freq=1 + - --is_training_metric=true + - --max_bin=255 + - --data=data/binary.train + - --valid_data=data/binary.test + - --num_trees=100 + - --learning_rate=01 + - --num_leaves=63 + - --tree_learner=feature + - --feature_fraction=0.8 + - --bagging_freq=5 + - --bagging_fraction=0.8 + - --min_data_in_leaf=50 + - --min_sum_hessian_in_leaf=50 + - --is_enable_sparse=true + - --use_two_round_loading=false + - --is_save_binary_file=false + diff --git a/examples/xgboost/smoke-dist/Dockerfile b/examples/xgboost/smoke-dist/Dockerfile new file mode 100644 index 0000000000..74ed8717fa --- /dev/null +++ b/examples/xgboost/smoke-dist/Dockerfile @@ -0,0 +1,26 @@ +# Install python 3.6 +FROM python:3.6 + +RUN apt-get update +RUN apt-get install -y git make g++ cmake + +RUN mkdir -p /opt/mlkube + +# Download the rabit tracker and xgboost code. + +COPY tracker.py /opt/mlkube/ +COPY requirements.txt /opt/mlkube/ + +# Install requirements + +RUN pip install -r /opt/mlkube/requirements.txt + +# Build XGBoost. +RUN git clone --recursive https://github.com/dmlc/xgboost && \ + cd xgboost && \ + make -j$(nproc) && \ + cd python-package; python setup.py install + +COPY xgboost_smoke_test.py /opt/mlkube/ + +ENTRYPOINT ["python", "/opt/mlkube/xgboost_smoke_test.py"] diff --git a/examples/xgboost/smoke-dist/README.md b/examples/xgboost/smoke-dist/README.md new file mode 100644 index 0000000000..8ac8cc3dd8 --- /dev/null +++ b/examples/xgboost/smoke-dist/README.md @@ -0,0 +1,99 @@ +### Distributed send/recv e2e test for xgboost rabit + +This folder containers Dockerfile and distributed send/recv test. + +**Build Image** + +The default image name and tag is `kubeflow/xgboost-dist-rabit-test:1.2`. +You can build the image based on your requirement. + +```shell +docker build -f Dockerfile -t kubeflow/xgboost-dist-rabit-test:1.2 ./ +``` + +**Start and test XGBoost Rabit tracker** + +``` +kubectl create -f xgboostjob_v1alpha1_rabit_test.yaml +``` + +**Look at the job status** +``` + kubectl get -o yaml XGBoostJob/xgboost-dist-test + ``` +Here is sample output when the job is running. The output result like this +``` +apiVersion: xgboostjob.kubeflow.org/v1alpha1 +kind: XGBoostJob +metadata: + creationTimestamp: "2019-06-21T03:32:57Z" + generation: 7 + name: xgboost-dist-test + namespace: default + resourceVersion: "258466" + selfLink: /apis/xgboostjob.kubeflow.org/v1alpha1/namespaces/default/xgboostjobs/xgboost-dist-test + uid: 431dc182-93d5-11e9-bbab-080027dfbfe2 +spec: + RunPolicy: + cleanPodPolicy: None + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + metadata: + creationTimestamp: null + spec: + containers: + - image: docker.io/merlintang/xgboost-dist-rabit-test:1.2 + imagePullPolicy: Always + name: xgboostjob + ports: + - containerPort: 9991 + name: xgboostjob-port + resources: {} + Worker: + replicas: 2 + restartPolicy: Never + template: + metadata: + creationTimestamp: null + spec: + containers: + - image: docker.io/merlintang/xgboost-dist-rabit-test:1.2 + imagePullPolicy: Always + name: xgboostjob + ports: + - containerPort: 9991 + name: xgboostjob-port + resources: {} +status: + completionTime: "2019-06-21T03:33:03Z" + conditions: + - lastTransitionTime: "2019-06-21T03:32:57Z" + lastUpdateTime: "2019-06-21T03:32:57Z" + message: xgboostJob xgboost-dist-test is created. + reason: XGBoostJobCreated + status: "True" + type: Created + - lastTransitionTime: "2019-06-21T03:32:57Z" + lastUpdateTime: "2019-06-21T03:32:57Z" + message: XGBoostJob xgboost-dist-test is running. + reason: XGBoostJobRunning + status: "False" + type: Running + - lastTransitionTime: "2019-06-21T03:33:03Z" + lastUpdateTime: "2019-06-21T03:33:03Z" + message: XGBoostJob xgboost-dist-test is successfully completed. + reason: XGBoostJobSucceeded + status: "True" + type: Succeeded + replicaStatuses: + Master: + succeeded: 1 + Worker: + succeeded: 2 +``` + + + diff --git a/examples/xgboost/smoke-dist/requirements.txt b/examples/xgboost/smoke-dist/requirements.txt new file mode 100644 index 0000000000..2df5bac4f2 --- /dev/null +++ b/examples/xgboost/smoke-dist/requirements.txt @@ -0,0 +1,5 @@ +numpy>=1.16.3 +Cython>=0.29.4 +requests>=2.21.0 +urllib3>=1.21.1 +scipy>=1.4.1 diff --git a/examples/xgboost/smoke-dist/tracker.py b/examples/xgboost/smoke-dist/tracker.py new file mode 100644 index 0000000000..7cd572c748 --- /dev/null +++ b/examples/xgboost/smoke-dist/tracker.py @@ -0,0 +1,506 @@ +""" +Tracker script for DMLC +Implements the tracker control protocol + - start dmlc jobs + - start ps scheduler and rabit tracker + - help nodes to establish links with each other +Tianqi Chen +-------------------------- +This was taken from +https://github.com/dmlc/dmlc-core/blob/master/tracker/dmlc_tracker/tracker.py +See LICENSE here +https://github.com/dmlc/dmlc-core/blob/master/LICENSE +No code modified or added except for this explanatory comment. +""" +# pylint: disable=invalid-name, missing-docstring, too-many-arguments +# pylint: disable=too-many-locals +# pylint: disable=too-many-branches, too-many-statements +from __future__ import absolute_import + +import os +import sys +import socket +import struct +import subprocess +import argparse +import time +import logging +from threading import Thread + + +class ExSocket(object): + """ + Extension of socket to handle recv and send of special data + """ + def __init__(self, sock): + self.sock = sock + + def recvall(self, nbytes): + res = [] + nread = 0 + while nread < nbytes: + chunk = self.sock.recv(min(nbytes - nread, 1024)) + nread += len(chunk) + res.append(chunk) + return b''.join(res) + + def recvint(self): + return struct.unpack('@i', self.recvall(4))[0] + + def sendint(self, n): + self.sock.sendall(struct.pack('@i', n)) + + def sendstr(self, s): + self.sendint(len(s)) + self.sock.sendall(s.encode()) + + def recvstr(self): + slen = self.recvint() + return self.recvall(slen).decode() + + +# magic number used to verify existence of data +kMagic = 0xff99 + + +def get_some_ip(host): + return socket.getaddrinfo(host, None)[0][4][0] + + +def get_family(addr): + return socket.getaddrinfo(addr, None)[0][0] + + +class SlaveEntry(object): + def __init__(self, sock, s_addr): + slave = ExSocket(sock) + self.sock = slave + self.host = get_some_ip(s_addr[0]) + magic = slave.recvint() + assert magic == kMagic, 'invalid magic number=%d from %s' % ( + magic, self.host) + slave.sendint(kMagic) + self.rank = slave.recvint() + self.world_size = slave.recvint() + self.jobid = slave.recvstr() + self.cmd = slave.recvstr() + self.wait_accept = 0 + self.port = None + + def decide_rank(self, job_map): + if self.rank >= 0: + return self.rank + if self.jobid != 'NULL' and self.jobid in job_map: + return job_map[self.jobid] + return -1 + + def assign_rank(self, rank, wait_conn, tree_map, parent_map, ring_map): + self.rank = rank + nnset = set(tree_map[rank]) + rprev, rnext = ring_map[rank] + self.sock.sendint(rank) + # send parent rank + self.sock.sendint(parent_map[rank]) + # send world size + self.sock.sendint(len(tree_map)) + self.sock.sendint(len(nnset)) + # send the rprev and next link + for r in nnset: + self.sock.sendint(r) + # send prev link + if rprev != -1 and rprev != rank: + nnset.add(rprev) + self.sock.sendint(rprev) + else: + self.sock.sendint(-1) + # send next link + if rnext != -1 and rnext != rank: + nnset.add(rnext) + self.sock.sendint(rnext) + else: + self.sock.sendint(-1) + while True: + ngood = self.sock.recvint() + goodset = set([]) + for _ in range(ngood): + goodset.add(self.sock.recvint()) + assert goodset.issubset(nnset) + badset = nnset - goodset + conset = [] + for r in badset: + if r in wait_conn: + conset.append(r) + self.sock.sendint(len(conset)) + self.sock.sendint(len(badset) - len(conset)) + for r in conset: + self.sock.sendstr(wait_conn[r].host) + self.sock.sendint(wait_conn[r].port) + self.sock.sendint(r) + nerr = self.sock.recvint() + if nerr != 0: + continue + self.port = self.sock.recvint() + rmset = [] + # all connection was successuly setup + for r in conset: + wait_conn[r].wait_accept -= 1 + if wait_conn[r].wait_accept == 0: + rmset.append(r) + for r in rmset: + wait_conn.pop(r, None) + self.wait_accept = len(badset) - len(conset) + return rmset + + +class RabitTracker(object): + """ + tracker for rabit + """ + def __init__(self, hostIP, nslave, port=9091, port_end=9999): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + for port in range(port, port_end): + try: + sock.bind((hostIP, port)) + self.port = port + break + except socket.error as e: + if e.errno in [98, 48]: + continue + else: + raise + sock.listen(256) + self.sock = sock + self.hostIP = hostIP + self.thread = None + self.start_time = None + self.end_time = None + self.nslave = nslave + logging.info('start listen on %s:%d', hostIP, self.port) + + def __del__(self): + self.sock.close() + + @staticmethod + def get_neighbor(rank, nslave): + rank = rank + 1 + ret = [] + if rank > 1: + ret.append(rank // 2 - 1) + if rank * 2 - 1 < nslave: + ret.append(rank * 2 - 1) + if rank * 2 < nslave: + ret.append(rank * 2) + return ret + + def slave_envs(self): + """ + get enviroment variables for slaves + can be passed in as args or envs + """ + return {'DMLC_TRACKER_URI': self.hostIP, + 'DMLC_TRACKER_PORT': self.port} + + def get_tree(self, nslave): + tree_map = {} + parent_map = {} + for r in range(nslave): + tree_map[r] = self.get_neighbor(r, nslave) + parent_map[r] = (r + 1) // 2 - 1 + return tree_map, parent_map + + def find_share_ring(self, tree_map, parent_map, r): + """ + get a ring structure that tends to share nodes with the tree + return a list starting from r + """ + nset = set(tree_map[r]) + cset = nset - set([parent_map[r]]) + if len(cset) == 0: + return [r] + rlst = [r] + cnt = 0 + for v in cset: + vlst = self.find_share_ring(tree_map, parent_map, v) + cnt += 1 + if cnt == len(cset): + vlst.reverse() + rlst += vlst + return rlst + + def get_ring(self, tree_map, parent_map): + """ + get a ring connection used to recover local data + """ + assert parent_map[0] == -1 + rlst = self.find_share_ring(tree_map, parent_map, 0) + assert len(rlst) == len(tree_map) + ring_map = {} + nslave = len(tree_map) + for r in range(nslave): + rprev = (r + nslave - 1) % nslave + rnext = (r + 1) % nslave + ring_map[rlst[r]] = (rlst[rprev], rlst[rnext]) + return ring_map + + def get_link_map(self, nslave): + """ + get the link map, this is a bit hacky, call for better algorithm + to place similar nodes together + """ + tree_map, parent_map = self.get_tree(nslave) + ring_map = self.get_ring(tree_map, parent_map) + rmap = {0: 0} + k = 0 + for i in range(nslave - 1): + k = ring_map[k][1] + rmap[k] = i + 1 + + ring_map_ = {} + tree_map_ = {} + parent_map_ = {} + for k, v in ring_map.items(): + ring_map_[rmap[k]] = (rmap[v[0]], rmap[v[1]]) + for k, v in tree_map.items(): + tree_map_[rmap[k]] = [rmap[x] for x in v] + for k, v in parent_map.items(): + if k != 0: + parent_map_[rmap[k]] = rmap[v] + else: + parent_map_[rmap[k]] = -1 + return tree_map_, parent_map_, ring_map_ + + def accept_slaves(self, nslave): + # set of nodes that finishs the job + shutdown = {} + # set of nodes that is waiting for connections + wait_conn = {} + # maps job id to rank + job_map = {} + # list of workers that is pending to be assigned rank + pending = [] + # lazy initialize tree_map + tree_map = None + + while len(shutdown) != nslave: + fd, s_addr = self.sock.accept() + s = SlaveEntry(fd, s_addr) + if s.cmd == 'print': + msg = s.sock.recvstr() + logging.info(msg.strip()) + continue + if s.cmd == 'shutdown': + assert s.rank >= 0 and s.rank not in shutdown + assert s.rank not in wait_conn + shutdown[s.rank] = s + logging.debug('Recieve %s signal from %d', s.cmd, s.rank) + continue + assert s.cmd == 'start' or s.cmd == 'recover' + # lazily initialize the slaves + if tree_map is None: + assert s.cmd == 'start' + if s.world_size > 0: + nslave = s.world_size + tree_map, parent_map, ring_map = self.get_link_map(nslave) + # set of nodes that is pending for getting up + todo_nodes = list(range(nslave)) + else: + assert s.world_size == -1 or s.world_size == nslave + if s.cmd == 'recover': + assert s.rank >= 0 + + rank = s.decide_rank(job_map) + # batch assignment of ranks + if rank == -1: + assert len(todo_nodes) != 0 + pending.append(s) + if len(pending) == len(todo_nodes): + pending.sort(key=lambda x: x.host) + for s in pending: + rank = todo_nodes.pop(0) + if s.jobid != 'NULL': + job_map[s.jobid] = rank + s.assign_rank(rank, wait_conn, tree_map, parent_map, + ring_map) + if s.wait_accept > 0: + wait_conn[rank] = s + logging.debug('Recieve %s signal from %s; ' + 'assign rank %d', s.cmd, s.host, s.rank) + if len(todo_nodes) == 0: + logging.info('@tracker All of %d nodes getting started', + nslave) + self.start_time = time.time() + else: + s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map) + logging.debug('Recieve %s signal from %d', s.cmd, s.rank) + if s.wait_accept > 0: + wait_conn[rank] = s + + logging.info("worker(ip_address=%s) connected!" % get_some_ip(s_addr[0])) + + logging.info('@tracker All nodes finishes job') + self.end_time = time.time() + logging.info('@tracker %s secs between node start and job finish', + str(self.end_time - self.start_time)) + + def start(self, nslave): + def run(): + self.accept_slaves(nslave) + self.thread = Thread(target=run, args=()) + self.thread.setDaemon(True) + self.thread.start() + + def join(self): + while self.thread.isAlive(): + self.thread.join(100) + + +class PSTracker(object): + """ + Tracker module for PS + """ + def __init__(self, hostIP, cmd, port=9091, port_end=9999, envs=None): + """ + Starts the PS scheduler + """ + self.cmd = cmd + if cmd is None: + return + envs = {} if envs is None else envs + self.hostIP = hostIP + sock = socket.socket(get_family(hostIP), socket.SOCK_STREAM) + for port in range(port, port_end): + try: + sock.bind(('', port)) + self.port = port + sock.close() + break + except socket.error: + continue + env = os.environ.copy() + + env['DMLC_ROLE'] = 'scheduler' + env['DMLC_PS_ROOT_URI'] = str(self.hostIP) + env['DMLC_PS_ROOT_PORT'] = str(self.port) + for k, v in envs.items(): + env[k] = str(v) + self.thread = Thread( + target=(lambda: subprocess.check_call(self.cmd, env=env, + shell=True)), args=()) + self.thread.setDaemon(True) + self.thread.start() + + def join(self): + if self.cmd is not None: + while self.thread.isAlive(): + self.thread.join(100) + + def slave_envs(self): + if self.cmd is None: + return {} + else: + return {'DMLC_PS_ROOT_URI': self.hostIP, + 'DMLC_PS_ROOT_PORT': self.port} + + +def get_host_ip(hostIP=None): + if hostIP is None or hostIP == 'auto': + hostIP = 'ip' + + if hostIP == 'dns': + hostIP = socket.getfqdn() + elif hostIP == 'ip': + from socket import gaierror + try: + hostIP = socket.gethostbyname(socket.getfqdn()) + except gaierror: + logging.warn('gethostbyname(socket.getfqdn()) failed... trying on ' + 'hostname()') + hostIP = socket.gethostbyname(socket.gethostname()) + if hostIP.startswith("127."): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # doesn't have to be reachable + s.connect(('10.255.255.255', 1)) + hostIP = s.getsockname()[0] + return hostIP + + +def submit(nworker, nserver, fun_submit, hostIP='auto', pscmd=None): + if nserver == 0: + pscmd = None + + envs = {'DMLC_NUM_WORKER': nworker, + 'DMLC_NUM_SERVER': nserver} + hostIP = get_host_ip(hostIP) + + if nserver == 0: + rabit = RabitTracker(hostIP=hostIP, nslave=nworker) + envs.update(rabit.slave_envs()) + rabit.start(nworker) + else: + pserver = PSTracker(hostIP=hostIP, cmd=pscmd, envs=envs) + envs.update(pserver.slave_envs()) + fun_submit(nworker, nserver, envs) + + if nserver == 0: + rabit.join() + else: + pserver.join() + + +def start_rabit_tracker(args): + """Standalone function to start rabit tracker. + Parameters + ---------- + args: arguments to start the rabit tracker. + """ + envs = {'DMLC_NUM_WORKER': args.num_workers, + 'DMLC_NUM_SERVER': args.num_servers} + rabit = RabitTracker(hostIP=get_host_ip(args.host_ip), + nslave=args.num_workers) + envs.update(rabit.slave_envs()) + rabit.start(args.num_workers) + sys.stdout.write('DMLC_TRACKER_ENV_START\n') + # simply write configuration to stdout + for k, v in envs.items(): + sys.stdout.write('%s=%s\n' % (k, str(v))) + sys.stdout.write('DMLC_TRACKER_ENV_END\n') + sys.stdout.flush() + rabit.join() + + +def main(): + """Main function if tracker is executed in standalone mode.""" + parser = argparse.ArgumentParser(description='Rabit Tracker start.') + parser.add_argument('--num-workers', required=True, type=int, + help='Number of worker proccess to be launched.') + parser.add_argument('--num-servers', default=0, type=int, + help='Number of server process to be launched. Only ' + 'used in PS jobs.') + parser.add_argument('--host-ip', default=None, type=str, + help=('Host IP addressed, this is only needed ' + + 'if the host IP cannot be automatically guessed.' + )) + parser.add_argument('--log-level', default='INFO', type=str, + choices=['INFO', 'DEBUG'], + help='Logging level of the logger.') + args = parser.parse_args() + + fmt = '%(asctime)s %(levelname)s %(message)s' + if args.log_level == 'INFO': + level = logging.INFO + elif args.log_level == 'DEBUG': + level = logging.DEBUG + else: + raise RuntimeError("Unknown logging level %s" % args.log_level) + + logging.basicConfig(format=fmt, level=level) + + if args.num_servers == 0: + start_rabit_tracker(args) + else: + raise RuntimeError("Do not yet support start ps tracker in standalone " + "mode.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/xgboost/smoke-dist/xgboost_smoke_test.py b/examples/xgboost/smoke-dist/xgboost_smoke_test.py new file mode 100644 index 0000000000..92a7fd01f8 --- /dev/null +++ b/examples/xgboost/smoke-dist/xgboost_smoke_test.py @@ -0,0 +1,108 @@ + +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import xgboost as xgb +import traceback + +from tracker import RabitTracker + +logger = logging.getLogger(__name__) + +def extract_xgbooost_cluster_env(): + + logger.info("start to extract system env") + + master_addr = os.environ.get("MASTER_ADDR", "{}") + master_port = int(os.environ.get("MASTER_PORT", "{}")) + rank = int(os.environ.get("RANK", "{}")) + world_size = int(os.environ.get("WORLD_SIZE", "{}")) + + logger.info("extract the rabit env from cluster : %s, port: %d, rank: %d, word_size: %d ", + master_addr, master_port, rank, world_size) + + return master_addr, master_port, rank, world_size + +def setup_rabit_cluster(): + addr, port, rank, world_size = extract_xgbooost_cluster_env() + + rabit_tracker = None + try: + """start to build the network""" + if world_size > 1: + if rank == 0: + logger.info("start the master node") + + rabit = RabitTracker(hostIP="0.0.0.0", nslave=world_size, + port=port, port_end=port + 1) + rabit.start(world_size) + rabit_tracker = rabit + logger.info('########### RabitTracker Setup Finished #########') + + envs = [ + 'DMLC_NUM_WORKER=%d' % world_size, + 'DMLC_TRACKER_URI=%s' % addr, + 'DMLC_TRACKER_PORT=%d' % port, + 'DMLC_TASK_ID=%d' % rank + ] + logger.info('##### Rabit rank setup with below envs #####') + for i, env in enumerate(envs): + logger.info(env) + envs[i] = str.encode(env) + + xgb.rabit.init(envs) + logger.info('##### Rabit rank = %d' % xgb.rabit.get_rank()) + + rank = xgb.rabit.get_rank() + s = None + if rank == 0: + s = {'hello world': 100, 2: 3} + + logger.info('@node[%d] before-broadcast: s=\"%s\"' % (rank, str(s))) + s = xgb.rabit.broadcast(s, 0) + + logger.info('@node[%d] after-broadcast: s=\"%s\"' % (rank, str(s))) + + except Exception as e: + logger.error("something wrong happen: %s", traceback.format_exc()) + raise e + finally: + if world_size > 1: + xgb.rabit.finalize() + if rabit_tracker: + rabit_tracker.join() + + logger.info("the rabit network testing finished!") + +def main(): + + port = os.environ.get("MASTER_PORT", "{}") + logging.info("MASTER_PORT: %s", port) + + addr = os.environ.get("MASTER_ADDR", "{}") + logging.info("MASTER_ADDR: %s", addr) + + world_size = os.environ.get("WORLD_SIZE", "{}") + logging.info("WORLD_SIZE: %s", world_size) + + rank = os.environ.get("RANK", "{}") + logging.info("RANK: %s", rank) + + setup_rabit_cluster() + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + main() diff --git a/examples/xgboost/smoke-dist/xgboostjob_v1_rabit_test.yaml b/examples/xgboost/smoke-dist/xgboostjob_v1_rabit_test.yaml new file mode 100644 index 0000000000..91b03c1d5e --- /dev/null +++ b/examples/xgboost/smoke-dist/xgboostjob_v1_rabit_test.yaml @@ -0,0 +1,31 @@ +apiVersion: kubeflow.org/v1 +kind: XGBoostJob +metadata: + name: xgboost-dist-test +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-rabit-test:1.2 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + Worker: + replicas: 2 + restartPolicy: Never + template: + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-rabit-test:1.2 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + diff --git a/examples/xgboost/smoke-dist/xgboostjob_v1alpha1_rabit_test.yaml b/examples/xgboost/smoke-dist/xgboostjob_v1alpha1_rabit_test.yaml new file mode 100644 index 0000000000..3205297683 --- /dev/null +++ b/examples/xgboost/smoke-dist/xgboostjob_v1alpha1_rabit_test.yaml @@ -0,0 +1,35 @@ +apiVersion: kubeflow.org/v1 +kind: XGBoostJob +metadata: + name: xgboost-dist-test +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-rabit-test:1.2 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + Worker: + replicas: 2 + restartPolicy: Never + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-rabit-test:1.2 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + diff --git a/examples/xgboost/xgboost-dist/Dockerfile b/examples/xgboost/xgboost-dist/Dockerfile new file mode 100644 index 0000000000..e72032e9c2 --- /dev/null +++ b/examples/xgboost/xgboost-dist/Dockerfile @@ -0,0 +1,25 @@ +# Install python 3。6. +FROM python:3.6 + +RUN apt-get update +RUN apt-get install -y git make g++ cmake + +RUN mkdir -p /opt/mlkube + +# Download the rabit tracker and xgboost code. + +COPY requirements.txt /opt/mlkube/ + +# Install requirements + +RUN pip install -r /opt/mlkube/requirements.txt + +# Build XGBoost. +RUN git clone --recursive https://github.com/dmlc/xgboost && \ + cd xgboost && \ + make -j$(nproc) && \ + cd python-package; python setup.py install + +COPY *.py /opt/mlkube/ + +ENTRYPOINT ["python", "/opt/mlkube/main.py"] diff --git a/examples/xgboost/xgboost-dist/README.md b/examples/xgboost/xgboost-dist/README.md new file mode 100644 index 0000000000..0756a92022 --- /dev/null +++ b/examples/xgboost/xgboost-dist/README.md @@ -0,0 +1,506 @@ +### Distributed XGBoost Job train and prediction + +This folder containers related files for distributed XGBoost training and prediction. In this demo, +[Iris Data Set](https://archive.ics.uci.edu/ml/datasets/iris) is a well known multi-class classification dataset. +Thus, in this demo, distributed XGBoost job is able to do multi-class classification problem. Meanwhile, +User can extend provided data reader to read data from distributed data storage like HDFS, HBase or Hive etc. + + +**Build image** + +The default image name and tag is `kubeflow/xgboost-dist-iris-test:1.1` respectiveily. + +```shell +docker build -f Dockerfile -t kubeflow/xgboost-dist-iris-test:1.0 ./ +``` + +Then you can push the docker image into repository +```shell +docker push kubeflow/xgboost-dist-iris-test:1.0 ./ +``` + +**Configure the job runtime via Yaml file** + +The following files are available to setup distributed XGBoost computation runtime + +To store the model in OSS: + +* xgboostjob_v1alpha1_iris_train.yaml +* xgboostjob_v1alpha1_iris_predict.yaml + +To store the model in local path: + +* xgboostjob_v1alpha1_iris_train_local.yaml +* xgboostjob_v1alpha1_iris_predict_local.yaml + +For training jobs in OSS , you could configure xgboostjob_v1alpha1_iris_train.yaml and xgboostjob_v1alpha1_iris_predict.yaml +Note, we use [OSS](https://www.alibabacloud.com/product/oss) to store the trained model, +thus, you need to specify the OSS parameter in the yaml file. Therefore, remember to fill the OSS parameter in xgboostjob_v1alpha1_iris_train.yaml and xgboostjob_v1alpha1_iris_predict.yaml file. +The oss parameter includes the account information such as access_id, access_key, access_bucket and endpoint. +For Eg: +--oss_param=endpoint:http://oss-ap-south-1.aliyuncs.com,access_id:XXXXXXXXXXX,access_key:XXXXXXXXXXXXXXXXXXX,access_bucket:XXXXXX +Similarly, xgboostjob_v1alpha1_iris_predict.yaml is used to configure XGBoost job batch prediction. + + +**Start the distributed XGBoost train to store the model in OSS** +``` +kubectl create -f xgboostjob_v1alpha1_iris_train.yaml +``` + +**Look at the train job status** +``` + kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-train + ``` + Here is a sample output when the job is finished. The output log like this +``` +Name: xgboost-dist-iris-test +Namespace: default +Labels: +Annotations: +API Version: xgboostjob.kubeflow.org/v1alpha1 +Kind: XGBoostJob +Metadata: + Creation Timestamp: 2019-06-27T01:16:09Z + Generation: 9 + Resource Version: 385834 + Self Link: /apis/xgboostjob.kubeflow.org/v1alpha1/namespaces/default/xgboostjobs/xgboost-dist-iris-test + UID: 2565e99a-9879-11e9-bbab-080027dfbfe2 +Spec: + Run Policy: + Clean Pod Policy: None + Xgb Replica Specs: + Master: + Replicas: 1 + Restart Policy: Never + Template: + Metadata: + Creation Timestamp: + Spec: + Containers: + Args: + --job_type=Train + --xgboost_parameter=objective:multi:softprob,num_class:3 + --n_estimators=10 + --learning_rate=0.1 + --model_path=autoAI/xgb-opt/2 + --model_storage_type=oss + --oss_param=unknown + Image: docker.io/merlintang/xgboost-dist-iris:1.1 + Image Pull Policy: Always + Name: xgboostjob + Ports: + Container Port: 9991 + Name: xgboostjob-port + Resources: + Worker: + Replicas: 2 + Restart Policy: ExitCode + Template: + Metadata: + Creation Timestamp: + Spec: + Containers: + Args: + --job_type=Train + --xgboost_parameter="objective:multi:softprob,num_class:3" + --n_estimators=10 + --learning_rate=0.1 + --model_path="/tmp/xgboost_model" + --model_storage_type=oss + Image: docker.io/merlintang/xgboost-dist-iris:1.1 + Image Pull Policy: Always + Name: xgboostjob + Ports: + Container Port: 9991 + Name: xgboostjob-port + Resources: +Status: + Completion Time: 2019-06-27T01:17:04Z + Conditions: + Last Transition Time: 2019-06-27T01:16:09Z + Last Update Time: 2019-06-27T01:16:09Z + Message: xgboostJob xgboost-dist-iris-test is created. + Reason: XGBoostJobCreated + Status: True + Type: Created + Last Transition Time: 2019-06-27T01:16:09Z + Last Update Time: 2019-06-27T01:16:09Z + Message: XGBoostJob xgboost-dist-iris-test is running. + Reason: XGBoostJobRunning + Status: False + Type: Running + Last Transition Time: 2019-06-27T01:17:04Z + Last Update Time: 2019-06-27T01:17:04Z + Message: XGBoostJob xgboost-dist-iris-test is successfully completed. + Reason: XGBoostJobSucceeded + Status: True + Type: Succeeded + Replica Statuses: + Master: + Succeeded: 1 + Worker: + Succeeded: 2 +Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal SuccessfulCreatePod 102s xgboostjob-operator Created pod: xgboost-dist-iris-test-master-0 + Normal SuccessfulCreateService 102s xgboostjob-operator Created service: xgboost-dist-iris-test-master-0 + Normal SuccessfulCreatePod 102s xgboostjob-operator Created pod: xgboost-dist-iris-test-worker-1 + Normal SuccessfulCreateService 102s xgboostjob-operator Created service: xgboost-dist-iris-test-worker-0 + Normal SuccessfulCreateService 102s xgboostjob-operator Created service: xgboost-dist-iris-test-worker-1 + Normal SuccessfulCreatePod 64s xgboostjob-operator Created pod: xgboost-dist-iris-test-worker-0 + Normal ExitedWithCode 47s (x3 over 49s) xgboostjob-operator Pod: default.xgboost-dist-iris-test-worker-1 exited with code 0 + Normal ExitedWithCode 47s xgboostjob-operator Pod: default.xgboost-dist-iris-test-master-0 exited with code 0 + Normal XGBoostJobSucceeded 47s xgboostjob-operator XGBoostJob xgboost-dist-iris-test is successfully completed. + ``` + +**Start the distributed XGBoost job predict** +```shell +kubectl create -f xgboostjob_v1alpha1_iris_predict.yaml +``` + +**Look at the batch predict job status** +``` + kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-predict + ``` + Here is a sample output when the job is finished. The output log like this +``` +Name: xgboost-dist-iris-test-predict +Namespace: default +Labels: +Annotations: +API Version: xgboostjob.kubeflow.org/v1alpha1 +Kind: XGBoostJob +Metadata: + Creation Timestamp: 2019-06-27T06:06:53Z + Generation: 8 + Resource Version: 394523 + Self Link: /apis/xgboostjob.kubeflow.org/v1alpha1/namespaces/default/xgboostjobs/xgboost-dist-iris-test-predict + UID: c2a04cbc-98a1-11e9-bbab-080027dfbfe2 +Spec: + Run Policy: + Clean Pod Policy: None + Xgb Replica Specs: + Master: + Replicas: 1 + Restart Policy: Never + Template: + Metadata: + Creation Timestamp: + Spec: + Containers: + Args: + --job_type=Predict + --model_path=autoAI/xgb-opt/3 + --model_storage_type=oss + --oss_param=unkown + Image: docker.io/merlintang/xgboost-dist-iris:1.1 + Image Pull Policy: Always + Name: xgboostjob + Ports: + Container Port: 9991 + Name: xgboostjob-port + Resources: + Worker: + Replicas: 2 + Restart Policy: ExitCode + Template: + Metadata: + Creation Timestamp: + Spec: + Containers: + Args: + --job_type=Predict + --model_path=autoAI/xgb-opt/3 + --model_storage_type=oss + --oss_param=unkown + Image: docker.io/merlintang/xgboost-dist-iris:1.1 + Image Pull Policy: Always + Name: xgboostjob + Ports: + Container Port: 9991 + Name: xgboostjob-port + Resources: +Status: + Completion Time: 2019-06-27T06:07:02Z + Conditions: + Last Transition Time: 2019-06-27T06:06:53Z + Last Update Time: 2019-06-27T06:06:53Z + Message: xgboostJob xgboost-dist-iris-test-predict is created. + Reason: XGBoostJobCreated + Status: True + Type: Created + Last Transition Time: 2019-06-27T06:06:53Z + Last Update Time: 2019-06-27T06:06:53Z + Message: XGBoostJob xgboost-dist-iris-test-predict is running. + Reason: XGBoostJobRunning + Status: False + Type: Running + Last Transition Time: 2019-06-27T06:07:02Z + Last Update Time: 2019-06-27T06:07:02Z + Message: XGBoostJob xgboost-dist-iris-test-predict is successfully completed. + Reason: XGBoostJobSucceeded + Status: True + Type: Succeeded + Replica Statuses: + Master: + Succeeded: 1 + Worker: + Succeeded: 2 +Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal SuccessfulCreatePod 47s xgboostjob-operator Created pod: xgboost-dist-iris-test-predict-worker-0 + Normal SuccessfulCreatePod 47s xgboostjob-operator Created pod: xgboost-dist-iris-test-predict-worker-1 + Normal SuccessfulCreateService 47s xgboostjob-operator Created service: xgboost-dist-iris-test-predict-worker-0 + Normal SuccessfulCreateService 47s xgboostjob-operator Created service: xgboost-dist-iris-test-predict-worker-1 + Normal SuccessfulCreatePod 47s xgboostjob-operator Created pod: xgboost-dist-iris-test-predict-master-0 + Normal SuccessfulCreateService 47s xgboostjob-operator Created service: xgboost-dist-iris-test-predict-master-0 + Normal ExitedWithCode 38s (x3 over 40s) xgboostjob-operator Pod: default.xgboost-dist-iris-test-predict-worker-0 exited with code 0 + Normal ExitedWithCode 38s xgboostjob-operator Pod: default.xgboost-dist-iris-test-predict-master-0 exited with code 0 + Normal XGBoostJobSucceeded 38s xgboostjob-operator XGBoostJob xgboost-dist-iris-test-predict is successfully completed. +``` + +**Start the distributed XGBoost train to store the model locally** + +Before proceeding with training we will create a PVC to store the model trained. +Creating pvc : +create a yaml file with the below content +pvc.yaml +``` +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: xgboostlocal +spec: + storageClassName: glusterfs + accessModes: + - ReadWriteMany + resources: + requests: + storage: 10Gi +``` +``` +kubectl create -f pvc.yaml +``` +Note: + +* Please use the storage class which supports ReadWriteMany. The example yaml above uses glusterfs + +* Mention model_storage_type=local and model_path accordingly( In the example /tmp/xgboost_model/2 is used ) in xgboostjob_v1alpha1_iris_train_local.yaml and xgboostjob_v1alpha1_iris_predict_local.yaml" + +Now start the distributed XGBoost train. +``` +kubectl create -f xgboostjob_v1alpha1_iris_train_local.yaml +``` + +**Look at the train job status** +``` + kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-train-local + ``` + Here is a sample output when the job is finished. The output log like this +``` + +apiVersion: xgboostjob.kubeflow.org/v1alpha1 +kind: XGBoostJob +metadata: + creationTimestamp: "2019-09-17T05:36:01Z" + generation: 7 + name: xgboost-dist-iris-test-train_local + namespace: default + resourceVersion: "8919366" + selfLink: /apis/xgboostjob.kubeflow.org/v1alpha1/namespaces/default/xgboostjobs/xgboost-dist-iris-test-train_local + uid: 08f85fad-d90d-11e9-aca1-fa163ea13108 +spec: + RunPolicy: + cleanPodPolicy: None + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + metadata: + creationTimestamp: null + spec: + containers: + - args: + - --job_type=Train + - --xgboost_parameter=objective:multi:softprob,num_class:3 + - --n_estimators=10 + - --learning_rate=0.1 + - --model_path=/tmp/xgboost_model/2 + - --model_storage_type=local + image: docker.io/merlintang/xgboost-dist-iris:1.1 + imagePullPolicy: Always + name: xgboostjob + ports: + - containerPort: 9991 + name: xgboostjob-port + resources: {} + volumeMounts: + - mountPath: /tmp/xgboost_model + name: task-pv-storage + volumes: + - name: task-pv-storage + persistentVolumeClaim: + claimName: xgboostlocal + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + metadata: + creationTimestamp: null + spec: + containers: + - args: + - --job_type=Train + - --xgboost_parameter="objective:multi:softprob,num_class:3" + - --n_estimators=10 + - --learning_rate=0.1 + - --model_path=/tmp/xgboost_model/2 + - --model_storage_type=local + image: bcmt-registry:5000/kubeflow/xgboost-dist-iris-test:1.0 + imagePullPolicy: Always + name: xgboostjob + ports: + - containerPort: 9991 + name: xgboostjob-port + resources: {} + volumeMounts: + - mountPath: /tmp/xgboost_model + name: task-pv-storage + volumes: + - name: task-pv-storage + persistentVolumeClaim: + claimName: xgboostlocal +status: + completionTime: "2019-09-17T05:37:02Z" + conditions: + - lastTransitionTime: "2019-09-17T05:36:02Z" + lastUpdateTime: "2019-09-17T05:36:02Z" + message: xgboostJob xgboost-dist-iris-test-train_local is created. + reason: XGBoostJobCreated + status: "True" + type: Created + - lastTransitionTime: "2019-09-17T05:36:02Z" + lastUpdateTime: "2019-09-17T05:36:02Z" + message: XGBoostJob xgboost-dist-iris-test-train_local is running. + reason: XGBoostJobRunning + status: "False" + type: Running + - lastTransitionTime: "2019-09-17T05:37:02Z" + lastUpdateTime: "2019-09-17T05:37:02Z" + message: XGBoostJob xgboost-dist-iris-test-train_local is successfully completed. + reason: XGBoostJobSucceeded + status: "True" + type: Succeeded + replicaStatuses: + Master: + succeeded: 1 + Worker: + succeeded: 2 + ``` +**Start the distributed XGBoost job predict** +``` +kubectl create -f xgboostjob_v1alpha1_iris_predict_local.yaml +``` + +**Look at the batch predict job status** +``` + kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-predict-local + ``` + Here is a sample output when the job is finished. The output log like this +``` +apiVersion: xgboostjob.kubeflow.org/v1alpha1 +kind: XGBoostJob +metadata: + creationTimestamp: "2019-09-17T06:33:38Z" + generation: 6 + name: xgboost-dist-iris-test-predict_local + namespace: default + resourceVersion: "8976054" + selfLink: /apis/xgboostjob.kubeflow.org/v1alpha1/namespaces/default/xgboostjobs/xgboost-dist-iris-test-predict_local + uid: 151655b0-d915-11e9-aca1-fa163ea13108 +spec: + RunPolicy: + cleanPodPolicy: None + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + metadata: + creationTimestamp: null + spec: + containers: + - args: + - --job_type=Predict + - --model_path=/tmp/xgboost_model/2 + - --model_storage_type=local + image: docker.io/merlintang/xgboost-dist-iris:1.1 + imagePullPolicy: Always + name: xgboostjob + ports: + - containerPort: 9991 + name: xgboostjob-port + resources: {} + volumeMounts: + - mountPath: /tmp/xgboost_model + name: task-pv-storage + volumes: + - name: task-pv-storage + persistentVolumeClaim: + claimName: xgboostlocal + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + metadata: + creationTimestamp: null + spec: + containers: + - args: + - --job_type=Predict + - --model_path=/tmp/xgboost_model/2 + - --model_storage_type=local + image: docker.io/merlintang/xgboost-dist-iris:1.1 + imagePullPolicy: Always + name: xgboostjob + ports: + - containerPort: 9991 + name: xgboostjob-port + resources: {} + volumeMounts: + - mountPath: /tmp/xgboost_model + name: task-pv-storage + volumes: + - name: task-pv-storage + persistentVolumeClaim: + claimName: xgboostlocal +status: + completionTime: "2019-09-17T06:33:51Z" + conditions: + - lastTransitionTime: "2019-09-17T06:33:38Z" + lastUpdateTime: "2019-09-17T06:33:38Z" + message: xgboostJob xgboost-dist-iris-test-predict_local is created. + reason: XGBoostJobCreated + status: "True" + type: Created + - lastTransitionTime: "2019-09-17T06:33:38Z" + lastUpdateTime: "2019-09-17T06:33:38Z" + message: XGBoostJob xgboost-dist-iris-test-predict_local is running. + reason: XGBoostJobRunning + status: "False" + type: Running + - lastTransitionTime: "2019-09-17T06:33:51Z" + lastUpdateTime: "2019-09-17T06:33:51Z" + message: XGBoostJob xgboost-dist-iris-test-predict_local is successfully completed. + reason: XGBoostJobSucceeded + status: "True" + type: Succeeded + replicaStatuses: + Master: + succeeded: 1 + Worker: + succeeded: 1 +``` diff --git a/examples/xgboost/xgboost-dist/build.sh b/examples/xgboost/xgboost-dist/build.sh new file mode 100644 index 0000000000..b837f07341 --- /dev/null +++ b/examples/xgboost/xgboost-dist/build.sh @@ -0,0 +1,11 @@ +## build the docker file +docker build -f Dockerfile -t merlintang/xgboost-dist-iris:1.0 ./ + +## push the docker image into docker.io +docker push merlintang/xgboost-dist-iris:1.0 + +## run the train job +kubectl create -f xgboostjob_v1alpha1_iris_train.yaml + +## run the predict job +kubectl create -f xgboostjob_v1alpha1_iris_predict.yaml diff --git a/examples/xgboost/xgboost-dist/local_test.py b/examples/xgboost/xgboost-dist/local_test.py new file mode 100644 index 0000000000..8f7b0a7d21 --- /dev/null +++ b/examples/xgboost/xgboost-dist/local_test.py @@ -0,0 +1,96 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +this file contains tests for xgboost local train and predict in single machine. +Note: this is not for distributed train and predict test +""" +from utils import dump_model, read_model, read_train_data, read_predict_data +import xgboost as xgb +import logging +import numpy as np +from sklearn.metrics import precision_score + +logger = logging.getLogger(__name__) + + +def test_train_model(): + """ + test xgboost train in a single machine + :return: trained model + """ + rank = 1 + world_size = 10 + place = "/tmp/data" + dmatrix = read_train_data(rank, world_size, place) + + param_xgboost_default = {'max_depth': 2, 'eta': 1, 'silent': 1, + 'objective': 'multi:softprob', 'num_class': 3} + + booster = xgb.train(param_xgboost_default, dtrain=dmatrix) + + assert booster is not None + + return booster + + +def test_model_predict(booster): + """ + test xgboost train in the single node + :return: true if pass the test + """ + rank = 1 + world_size = 10 + place = "/tmp/data" + dmatrix, y_test = read_predict_data(rank, world_size, place) + + preds = booster.predict(dmatrix) + best_preds = np.asarray([np.argmax(line) for line in preds]) + score = precision_score(y_test, best_preds, average='macro') + + assert score > 0.99 + + logging.info("Predict accuracy: %f", score) + + return True + + +def test_upload_model(model, model_path, args): + + return dump_model(model, type="local", model_path=model_path, args=args) + + +def test_download_model(model_path, args): + + return read_model(type="local", model_path=model_path, args=args) + + +def run_test(): + args = {} + model_path = "/tmp/xgboost" + + logging.info("Start the local test") + + booster = test_train_model() + test_upload_model(booster, model_path, args) + booster_new = test_download_model(model_path, args) + test_model_predict(booster_new) + + logging.info("Finish the local test") + + +if __name__ == '__main__': + + logging.basicConfig(format='%(message)s') + logging.getLogger().setLevel(logging.INFO) + + run_test() diff --git a/examples/xgboost/xgboost-dist/main.py b/examples/xgboost/xgboost-dist/main.py new file mode 100644 index 0000000000..193119117f --- /dev/null +++ b/examples/xgboost/xgboost-dist/main.py @@ -0,0 +1,94 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging + +from train import train +from predict import predict +from utils import dump_model + + +def main(args): + + model_storage_type = args.model_storage_type + if (model_storage_type == "local" or model_storage_type == "oss"): + print ( "The storage type is " + model_storage_type) + else: + raise Exception("Only supports storage types like local and OSS") + + if args.job_type == "Predict": + logging.info("starting the predict job") + predict(args) + + elif args.job_type == "Train": + logging.info("starting the train job") + model = train(args) + + if model is not None: + logging.info("finish the model training, and start to dump model ") + model_path = args.model_path + dump_model(model, model_storage_type, model_path, args) + + elif args.job_type == "All": + logging.info("starting the train and predict job") + + logging.info("Finish distributed XGBoost job") + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument( + '--job_type', + help="Train, Predict, All", + required=True + ) + parser.add_argument( + '--xgboost_parameter', + help='XGBoost model parameter like: objective, number_class', + ) + parser.add_argument( + '--n_estimators', + help='Number of trees in the model', + type=int, + default=1000 + ) + parser.add_argument( + '--learning_rate', + help='Learning rate for the model', + default=0.1 + ) + parser.add_argument( + '--early_stopping_rounds', + help='XGBoost argument for stopping early', + default=50 + ) + parser.add_argument( + '--model_path', + help='place to store model', + default="/tmp/xgboost_model" + ) + parser.add_argument( + '--model_storage_type', + help='place to store the model', + default="oss" + ) + parser.add_argument( + '--oss_param', + help='oss parameter if you choose the model storage as OSS type', + ) + + logging.basicConfig(format='%(message)s') + logging.getLogger().setLevel(logging.INFO) + main_args = parser.parse_args() + main(main_args) diff --git a/examples/xgboost/xgboost-dist/predict.py b/examples/xgboost/xgboost-dist/predict.py new file mode 100644 index 0000000000..b4dbf54288 --- /dev/null +++ b/examples/xgboost/xgboost-dist/predict.py @@ -0,0 +1,40 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sklearn.metrics import precision_score + +import logging +import numpy as np + +from utils import extract_xgbooost_cluster_env, read_predict_data, read_model + + +def predict(args): + """ + This is the demonstration for the batch prediction + :param args: parameter for model related config + """ + + addr, port, rank, world_size = extract_xgbooost_cluster_env() + + dmatrix, y_test = read_predict_data(rank, world_size, None) + + model_path = args.model_path + storage_type = args.model_storage_type + booster = read_model(storage_type, model_path, args) + + preds = booster.predict(dmatrix) + + best_preds = np.asarray([np.argmax(line) for line in preds]) + score = precision_score(y_test, best_preds, average='macro') + + logging.info("Predict accuracy: %f", score) diff --git a/examples/xgboost/xgboost-dist/requirements.txt b/examples/xgboost/xgboost-dist/requirements.txt new file mode 100644 index 0000000000..60841a31a5 --- /dev/null +++ b/examples/xgboost/xgboost-dist/requirements.txt @@ -0,0 +1,9 @@ +numpy>=1.16.3 +Cython>=0.29.4 +requests>=2.21.0 +urllib3>=1.21.1 +scipy>=1.1.0 +joblib>=0.13.2 +scikit-learn>=0.20 +oss2>=2.7.0 +pandas>=0.24.2 \ No newline at end of file diff --git a/examples/xgboost/xgboost-dist/tracker.py b/examples/xgboost/xgboost-dist/tracker.py new file mode 100644 index 0000000000..7cd572c748 --- /dev/null +++ b/examples/xgboost/xgboost-dist/tracker.py @@ -0,0 +1,506 @@ +""" +Tracker script for DMLC +Implements the tracker control protocol + - start dmlc jobs + - start ps scheduler and rabit tracker + - help nodes to establish links with each other +Tianqi Chen +-------------------------- +This was taken from +https://github.com/dmlc/dmlc-core/blob/master/tracker/dmlc_tracker/tracker.py +See LICENSE here +https://github.com/dmlc/dmlc-core/blob/master/LICENSE +No code modified or added except for this explanatory comment. +""" +# pylint: disable=invalid-name, missing-docstring, too-many-arguments +# pylint: disable=too-many-locals +# pylint: disable=too-many-branches, too-many-statements +from __future__ import absolute_import + +import os +import sys +import socket +import struct +import subprocess +import argparse +import time +import logging +from threading import Thread + + +class ExSocket(object): + """ + Extension of socket to handle recv and send of special data + """ + def __init__(self, sock): + self.sock = sock + + def recvall(self, nbytes): + res = [] + nread = 0 + while nread < nbytes: + chunk = self.sock.recv(min(nbytes - nread, 1024)) + nread += len(chunk) + res.append(chunk) + return b''.join(res) + + def recvint(self): + return struct.unpack('@i', self.recvall(4))[0] + + def sendint(self, n): + self.sock.sendall(struct.pack('@i', n)) + + def sendstr(self, s): + self.sendint(len(s)) + self.sock.sendall(s.encode()) + + def recvstr(self): + slen = self.recvint() + return self.recvall(slen).decode() + + +# magic number used to verify existence of data +kMagic = 0xff99 + + +def get_some_ip(host): + return socket.getaddrinfo(host, None)[0][4][0] + + +def get_family(addr): + return socket.getaddrinfo(addr, None)[0][0] + + +class SlaveEntry(object): + def __init__(self, sock, s_addr): + slave = ExSocket(sock) + self.sock = slave + self.host = get_some_ip(s_addr[0]) + magic = slave.recvint() + assert magic == kMagic, 'invalid magic number=%d from %s' % ( + magic, self.host) + slave.sendint(kMagic) + self.rank = slave.recvint() + self.world_size = slave.recvint() + self.jobid = slave.recvstr() + self.cmd = slave.recvstr() + self.wait_accept = 0 + self.port = None + + def decide_rank(self, job_map): + if self.rank >= 0: + return self.rank + if self.jobid != 'NULL' and self.jobid in job_map: + return job_map[self.jobid] + return -1 + + def assign_rank(self, rank, wait_conn, tree_map, parent_map, ring_map): + self.rank = rank + nnset = set(tree_map[rank]) + rprev, rnext = ring_map[rank] + self.sock.sendint(rank) + # send parent rank + self.sock.sendint(parent_map[rank]) + # send world size + self.sock.sendint(len(tree_map)) + self.sock.sendint(len(nnset)) + # send the rprev and next link + for r in nnset: + self.sock.sendint(r) + # send prev link + if rprev != -1 and rprev != rank: + nnset.add(rprev) + self.sock.sendint(rprev) + else: + self.sock.sendint(-1) + # send next link + if rnext != -1 and rnext != rank: + nnset.add(rnext) + self.sock.sendint(rnext) + else: + self.sock.sendint(-1) + while True: + ngood = self.sock.recvint() + goodset = set([]) + for _ in range(ngood): + goodset.add(self.sock.recvint()) + assert goodset.issubset(nnset) + badset = nnset - goodset + conset = [] + for r in badset: + if r in wait_conn: + conset.append(r) + self.sock.sendint(len(conset)) + self.sock.sendint(len(badset) - len(conset)) + for r in conset: + self.sock.sendstr(wait_conn[r].host) + self.sock.sendint(wait_conn[r].port) + self.sock.sendint(r) + nerr = self.sock.recvint() + if nerr != 0: + continue + self.port = self.sock.recvint() + rmset = [] + # all connection was successuly setup + for r in conset: + wait_conn[r].wait_accept -= 1 + if wait_conn[r].wait_accept == 0: + rmset.append(r) + for r in rmset: + wait_conn.pop(r, None) + self.wait_accept = len(badset) - len(conset) + return rmset + + +class RabitTracker(object): + """ + tracker for rabit + """ + def __init__(self, hostIP, nslave, port=9091, port_end=9999): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + for port in range(port, port_end): + try: + sock.bind((hostIP, port)) + self.port = port + break + except socket.error as e: + if e.errno in [98, 48]: + continue + else: + raise + sock.listen(256) + self.sock = sock + self.hostIP = hostIP + self.thread = None + self.start_time = None + self.end_time = None + self.nslave = nslave + logging.info('start listen on %s:%d', hostIP, self.port) + + def __del__(self): + self.sock.close() + + @staticmethod + def get_neighbor(rank, nslave): + rank = rank + 1 + ret = [] + if rank > 1: + ret.append(rank // 2 - 1) + if rank * 2 - 1 < nslave: + ret.append(rank * 2 - 1) + if rank * 2 < nslave: + ret.append(rank * 2) + return ret + + def slave_envs(self): + """ + get enviroment variables for slaves + can be passed in as args or envs + """ + return {'DMLC_TRACKER_URI': self.hostIP, + 'DMLC_TRACKER_PORT': self.port} + + def get_tree(self, nslave): + tree_map = {} + parent_map = {} + for r in range(nslave): + tree_map[r] = self.get_neighbor(r, nslave) + parent_map[r] = (r + 1) // 2 - 1 + return tree_map, parent_map + + def find_share_ring(self, tree_map, parent_map, r): + """ + get a ring structure that tends to share nodes with the tree + return a list starting from r + """ + nset = set(tree_map[r]) + cset = nset - set([parent_map[r]]) + if len(cset) == 0: + return [r] + rlst = [r] + cnt = 0 + for v in cset: + vlst = self.find_share_ring(tree_map, parent_map, v) + cnt += 1 + if cnt == len(cset): + vlst.reverse() + rlst += vlst + return rlst + + def get_ring(self, tree_map, parent_map): + """ + get a ring connection used to recover local data + """ + assert parent_map[0] == -1 + rlst = self.find_share_ring(tree_map, parent_map, 0) + assert len(rlst) == len(tree_map) + ring_map = {} + nslave = len(tree_map) + for r in range(nslave): + rprev = (r + nslave - 1) % nslave + rnext = (r + 1) % nslave + ring_map[rlst[r]] = (rlst[rprev], rlst[rnext]) + return ring_map + + def get_link_map(self, nslave): + """ + get the link map, this is a bit hacky, call for better algorithm + to place similar nodes together + """ + tree_map, parent_map = self.get_tree(nslave) + ring_map = self.get_ring(tree_map, parent_map) + rmap = {0: 0} + k = 0 + for i in range(nslave - 1): + k = ring_map[k][1] + rmap[k] = i + 1 + + ring_map_ = {} + tree_map_ = {} + parent_map_ = {} + for k, v in ring_map.items(): + ring_map_[rmap[k]] = (rmap[v[0]], rmap[v[1]]) + for k, v in tree_map.items(): + tree_map_[rmap[k]] = [rmap[x] for x in v] + for k, v in parent_map.items(): + if k != 0: + parent_map_[rmap[k]] = rmap[v] + else: + parent_map_[rmap[k]] = -1 + return tree_map_, parent_map_, ring_map_ + + def accept_slaves(self, nslave): + # set of nodes that finishs the job + shutdown = {} + # set of nodes that is waiting for connections + wait_conn = {} + # maps job id to rank + job_map = {} + # list of workers that is pending to be assigned rank + pending = [] + # lazy initialize tree_map + tree_map = None + + while len(shutdown) != nslave: + fd, s_addr = self.sock.accept() + s = SlaveEntry(fd, s_addr) + if s.cmd == 'print': + msg = s.sock.recvstr() + logging.info(msg.strip()) + continue + if s.cmd == 'shutdown': + assert s.rank >= 0 and s.rank not in shutdown + assert s.rank not in wait_conn + shutdown[s.rank] = s + logging.debug('Recieve %s signal from %d', s.cmd, s.rank) + continue + assert s.cmd == 'start' or s.cmd == 'recover' + # lazily initialize the slaves + if tree_map is None: + assert s.cmd == 'start' + if s.world_size > 0: + nslave = s.world_size + tree_map, parent_map, ring_map = self.get_link_map(nslave) + # set of nodes that is pending for getting up + todo_nodes = list(range(nslave)) + else: + assert s.world_size == -1 or s.world_size == nslave + if s.cmd == 'recover': + assert s.rank >= 0 + + rank = s.decide_rank(job_map) + # batch assignment of ranks + if rank == -1: + assert len(todo_nodes) != 0 + pending.append(s) + if len(pending) == len(todo_nodes): + pending.sort(key=lambda x: x.host) + for s in pending: + rank = todo_nodes.pop(0) + if s.jobid != 'NULL': + job_map[s.jobid] = rank + s.assign_rank(rank, wait_conn, tree_map, parent_map, + ring_map) + if s.wait_accept > 0: + wait_conn[rank] = s + logging.debug('Recieve %s signal from %s; ' + 'assign rank %d', s.cmd, s.host, s.rank) + if len(todo_nodes) == 0: + logging.info('@tracker All of %d nodes getting started', + nslave) + self.start_time = time.time() + else: + s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map) + logging.debug('Recieve %s signal from %d', s.cmd, s.rank) + if s.wait_accept > 0: + wait_conn[rank] = s + + logging.info("worker(ip_address=%s) connected!" % get_some_ip(s_addr[0])) + + logging.info('@tracker All nodes finishes job') + self.end_time = time.time() + logging.info('@tracker %s secs between node start and job finish', + str(self.end_time - self.start_time)) + + def start(self, nslave): + def run(): + self.accept_slaves(nslave) + self.thread = Thread(target=run, args=()) + self.thread.setDaemon(True) + self.thread.start() + + def join(self): + while self.thread.isAlive(): + self.thread.join(100) + + +class PSTracker(object): + """ + Tracker module for PS + """ + def __init__(self, hostIP, cmd, port=9091, port_end=9999, envs=None): + """ + Starts the PS scheduler + """ + self.cmd = cmd + if cmd is None: + return + envs = {} if envs is None else envs + self.hostIP = hostIP + sock = socket.socket(get_family(hostIP), socket.SOCK_STREAM) + for port in range(port, port_end): + try: + sock.bind(('', port)) + self.port = port + sock.close() + break + except socket.error: + continue + env = os.environ.copy() + + env['DMLC_ROLE'] = 'scheduler' + env['DMLC_PS_ROOT_URI'] = str(self.hostIP) + env['DMLC_PS_ROOT_PORT'] = str(self.port) + for k, v in envs.items(): + env[k] = str(v) + self.thread = Thread( + target=(lambda: subprocess.check_call(self.cmd, env=env, + shell=True)), args=()) + self.thread.setDaemon(True) + self.thread.start() + + def join(self): + if self.cmd is not None: + while self.thread.isAlive(): + self.thread.join(100) + + def slave_envs(self): + if self.cmd is None: + return {} + else: + return {'DMLC_PS_ROOT_URI': self.hostIP, + 'DMLC_PS_ROOT_PORT': self.port} + + +def get_host_ip(hostIP=None): + if hostIP is None or hostIP == 'auto': + hostIP = 'ip' + + if hostIP == 'dns': + hostIP = socket.getfqdn() + elif hostIP == 'ip': + from socket import gaierror + try: + hostIP = socket.gethostbyname(socket.getfqdn()) + except gaierror: + logging.warn('gethostbyname(socket.getfqdn()) failed... trying on ' + 'hostname()') + hostIP = socket.gethostbyname(socket.gethostname()) + if hostIP.startswith("127."): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # doesn't have to be reachable + s.connect(('10.255.255.255', 1)) + hostIP = s.getsockname()[0] + return hostIP + + +def submit(nworker, nserver, fun_submit, hostIP='auto', pscmd=None): + if nserver == 0: + pscmd = None + + envs = {'DMLC_NUM_WORKER': nworker, + 'DMLC_NUM_SERVER': nserver} + hostIP = get_host_ip(hostIP) + + if nserver == 0: + rabit = RabitTracker(hostIP=hostIP, nslave=nworker) + envs.update(rabit.slave_envs()) + rabit.start(nworker) + else: + pserver = PSTracker(hostIP=hostIP, cmd=pscmd, envs=envs) + envs.update(pserver.slave_envs()) + fun_submit(nworker, nserver, envs) + + if nserver == 0: + rabit.join() + else: + pserver.join() + + +def start_rabit_tracker(args): + """Standalone function to start rabit tracker. + Parameters + ---------- + args: arguments to start the rabit tracker. + """ + envs = {'DMLC_NUM_WORKER': args.num_workers, + 'DMLC_NUM_SERVER': args.num_servers} + rabit = RabitTracker(hostIP=get_host_ip(args.host_ip), + nslave=args.num_workers) + envs.update(rabit.slave_envs()) + rabit.start(args.num_workers) + sys.stdout.write('DMLC_TRACKER_ENV_START\n') + # simply write configuration to stdout + for k, v in envs.items(): + sys.stdout.write('%s=%s\n' % (k, str(v))) + sys.stdout.write('DMLC_TRACKER_ENV_END\n') + sys.stdout.flush() + rabit.join() + + +def main(): + """Main function if tracker is executed in standalone mode.""" + parser = argparse.ArgumentParser(description='Rabit Tracker start.') + parser.add_argument('--num-workers', required=True, type=int, + help='Number of worker proccess to be launched.') + parser.add_argument('--num-servers', default=0, type=int, + help='Number of server process to be launched. Only ' + 'used in PS jobs.') + parser.add_argument('--host-ip', default=None, type=str, + help=('Host IP addressed, this is only needed ' + + 'if the host IP cannot be automatically guessed.' + )) + parser.add_argument('--log-level', default='INFO', type=str, + choices=['INFO', 'DEBUG'], + help='Logging level of the logger.') + args = parser.parse_args() + + fmt = '%(asctime)s %(levelname)s %(message)s' + if args.log_level == 'INFO': + level = logging.INFO + elif args.log_level == 'DEBUG': + level = logging.DEBUG + else: + raise RuntimeError("Unknown logging level %s" % args.log_level) + + logging.basicConfig(format=fmt, level=level) + + if args.num_servers == 0: + start_rabit_tracker(args) + else: + raise RuntimeError("Do not yet support start ps tracker in standalone " + "mode.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/xgboost/xgboost-dist/train.py b/examples/xgboost/xgboost-dist/train.py new file mode 100644 index 0000000000..290f09d892 --- /dev/null +++ b/examples/xgboost/xgboost-dist/train.py @@ -0,0 +1,91 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import xgboost as xgb +import traceback + +from tracker import RabitTracker +from utils import read_train_data, extract_xgbooost_cluster_env + +logger = logging.getLogger(__name__) + + +def train(args): + """ + :param args: configuration for train job + :return: XGBoost model + """ + addr, port, rank, world_size = extract_xgbooost_cluster_env() + rabit_tracker = None + + try: + """start to build the network""" + if world_size > 1: + if rank == 0: + logger.info("start the master node") + + rabit = RabitTracker(hostIP="0.0.0.0", nslave=world_size, + port=port, port_end=port + 1) + rabit.start(world_size) + rabit_tracker = rabit + logger.info('###### RabitTracker Setup Finished ######') + + envs = [ + 'DMLC_NUM_WORKER=%d' % world_size, + 'DMLC_TRACKER_URI=%s' % addr, + 'DMLC_TRACKER_PORT=%d' % port, + 'DMLC_TASK_ID=%d' % rank + ] + logger.info('##### Rabit rank setup with below envs #####') + for i, env in enumerate(envs): + logger.info(env) + envs[i] = str.encode(env) + + xgb.rabit.init(envs) + logger.info('##### Rabit rank = %d' % xgb.rabit.get_rank()) + rank = xgb.rabit.get_rank() + + else: + world_size = 1 + logging.info("Start the train in a single node") + + df = read_train_data(rank=rank, num_workers=world_size, path=None) + kwargs = {} + kwargs["dtrain"] = df + kwargs["num_boost_round"] = int(args.n_estimators) + param_xgboost_default = {'max_depth': 2, 'eta': 1, 'silent': 1, + 'objective': 'multi:softprob', 'num_class': 3} + kwargs["params"] = param_xgboost_default + + logging.info("starting to train xgboost at node with rank %d", rank) + bst = xgb.train(**kwargs) + + if rank == 0: + model = bst + else: + model = None + + logging.info("finish xgboost training at node with rank %d", rank) + + except Exception as e: + logger.error("something wrong happen: %s", traceback.format_exc()) + raise e + finally: + logger.info("xgboost training job finished!") + if world_size > 1: + xgb.rabit.finalize() + if rabit_tracker: + rabit_tracker.join() + + return model diff --git a/examples/xgboost/xgboost-dist/utils.py b/examples/xgboost/xgboost-dist/utils.py new file mode 100644 index 0000000000..283af8ba34 --- /dev/null +++ b/examples/xgboost/xgboost-dist/utils.py @@ -0,0 +1,291 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import joblib +import xgboost as xgb +import os +import tempfile +import oss2 +import json +import pandas as pd + +from sklearn import datasets + +logger = logging.getLogger(__name__) + + +def extract_xgbooost_cluster_env(): + """ + Extract the cluster env from pod + :return: the related cluster env to build rabit + """ + + logger.info("starting to extract system env") + + master_addr = os.environ.get("MASTER_ADDR", "{}") + master_port = int(os.environ.get("MASTER_PORT", "{}")) + rank = int(os.environ.get("RANK", "{}")) + world_size = int(os.environ.get("WORLD_SIZE", "{}")) + + logger.info("extract the Rabit env from cluster :" + " %s, port: %d, rank: %d, word_size: %d ", + master_addr, master_port, rank, world_size) + + return master_addr, master_port, rank, world_size + + +def read_train_data(rank, num_workers, path): + """ + Read file based on the rank of worker. + We use the sklearn.iris data for demonstration + You can extend this to read distributed data source like HDFS, HIVE etc + :param rank: the id of each worker + :param num_workers: total number of workers in this cluster + :param path: the input file name or the place to read the data + :return: XGBoost Dmatrix + """ + iris = datasets.load_iris() + x = iris.data + y = iris.target + + start, end = get_range_data(len(x), rank, num_workers) + x = x[start:end, :] + y = y[start:end] + + x = pd.DataFrame(x) + y = pd.DataFrame(y) + dtrain = xgb.DMatrix(data=x, label=y) + + logging.info("Read data from IRIS data source with range from %d to %d", + start, end) + + return dtrain + + +def read_predict_data(rank, num_workers, path): + """ + Read file based on the rank of worker. + We use the sklearn.iris data for demonstration + You can extend this to read distributed data source like HDFS, HIVE etc + :param rank: the id of each worker + :param num_workers: total number of workers in this cluster + :param path: the input file name or the place to read the data + :return: XGBoost Dmatrix, and real value + """ + iris = datasets.load_iris() + x = iris.data + y = iris.target + + start, end = get_range_data(len(x), rank, num_workers) + x = x[start:end, :] + y = y[start:end] + x = pd.DataFrame(x) + y = pd.DataFrame(y) + + logging.info("Read data from IRIS datasource with range from %d to %d", + start, end) + + predict = xgb.DMatrix(x, label=y) + + return predict, y + + +def get_range_data(num_row, rank, num_workers): + """ + compute the data range based on the input data size and worker id + :param num_row: total number of dataset + :param rank: the worker id + :param num_workers: total number of workers + :return: begin and end range of input matrix + """ + num_per_partition = int(num_row/num_workers) + + x_start = rank * num_per_partition + x_end = (rank + 1) * num_per_partition + + if x_end > num_row: + x_end = num_row + + return x_start, x_end + + +def dump_model(model, type, model_path, args): + """ + dump the trained model into local place + you can update this function to store the model into a remote place + :param model: the xgboost trained booster + :param type: model storage type + :param model_path: place to store model + :param args: configuration for model storage + :return: True if the dump process success + """ + if model is None: + raise Exception("fail to get the XGBoost train model") + else: + if type == "local": + joblib.dump(model, model_path) + logging.info("Dump model into local place %s", model_path) + + elif type == "oss": + oss_param = parse_parameters(args.oss_param, ",", ":") + if oss_param is None: + raise Exception("Please config oss parameter to store model") + + oss_param['path'] = args.model_path + dump_model_to_oss(oss_param, model) + logging.info("Dump model into oss place %s", args.model_path) + + return True + + +def read_model(type, model_path, args): + """ + read model from physical storage + :param type: oss or local + :param model_path: place to store the model + :param args: configuration to read model + :return: XGBoost model + """ + + if type == "local": + model = joblib.load(model_path) + logging.info("Read model from local place %s", model_path) + + elif type == "oss": + oss_param = parse_parameters(args.oss_param, ",", ":") + if oss_param is None: + raise Exception("Please config oss to read model") + return False + + oss_param['path'] = args.model_path + + model = read_model_from_oss(oss_param) + logging.info("read model from oss place %s", model_path) + + return model + + +def dump_model_to_oss(oss_parameters, booster): + """ + dump the model to remote OSS disk + :param oss_parameters: oss configuration + :param booster: XGBoost model + :return: True if stored procedure is success + """ + """export model into oss""" + model_fname = os.path.join(tempfile.mkdtemp(), 'model') + text_model_fname = os.path.join(tempfile.mkdtemp(), 'model.text') + feature_importance = os.path.join(tempfile.mkdtemp(), + 'feature_importance.json') + + oss_path = oss_parameters['path'] + logger.info('---- export model ----') + booster.save_model(model_fname) + booster.dump_model(text_model_fname) # format output model + fscore_dict = booster.get_fscore() + with open(feature_importance, 'w') as file: + file.write(json.dumps(fscore_dict)) + logger.info('---- chief dump model successfully!') + + if os.path.exists(model_fname): + logger.info('---- Upload Model start...') + + while oss_path[-1] == '/': + oss_path = oss_path[:-1] + + upload_oss(oss_parameters, model_fname, oss_path) + aux_path = oss_path + '_dir/' + upload_oss(oss_parameters, model_fname, aux_path) + upload_oss(oss_parameters, text_model_fname, aux_path) + upload_oss(oss_parameters, feature_importance, aux_path) + else: + raise Exception("fail to generate model") + return False + + return True + + +def upload_oss(kw, local_file, oss_path): + """ + help function to upload a model to oss + :param kw: OSS parameter + :param local_file: local place of model + :param oss_path: remote place of OSS + :return: True if the procedure is success + """ + if oss_path[-1] == '/': + oss_path = '%s%s' % (oss_path, os.path.basename(local_file)) + + auth = oss2.Auth(kw['access_id'], kw['access_key']) + bucket = kw['access_bucket'] + bkt = oss2.Bucket(auth=auth, endpoint=kw['endpoint'], bucket_name=bucket) + + try: + bkt.put_object_from_file(key=oss_path, filename=local_file) + logger.info("upload %s to %s successfully!" % + (os.path.abspath(local_file), oss_path)) + except Exception(): + raise ValueError('upload %s to %s failed' % + (os.path.abspath(local_file), oss_path)) + + +def read_model_from_oss(kw): + """ + helper function to read a model from oss + :param kw: OSS parameter + :return: XGBoost booster model + """ + auth = oss2.Auth(kw['access_id'], kw['access_key']) + bucket = kw['access_bucket'] + bkt = oss2.Bucket(auth=auth, endpoint=kw['endpoint'], bucket_name=bucket) + oss_path = kw["path"] + + temp_model_fname = os.path.join(tempfile.mkdtemp(), 'local_model') + try: + bkt.get_object_to_file(key=oss_path, filename=temp_model_fname) + logger.info("success to load model from oss %s", oss_path) + except Exception as e: + logging.error("fail to load model: " + e) + raise Exception("fail to load model from oss %s", oss_path) + + bst = xgb.Booster({'nthread': 2}) # init model + + bst.load_model(temp_model_fname) + + return bst + + +def parse_parameters(input, splitter_between, splitter_in): + """ + helper function parse the input parameter + :param input: the string of configuration like key-value pairs + :param splitter_between: the splitter between config for input string + :param splitter_in: the splitter inside config for input string + :return: key-value pair configuration + """ + + ky_pairs = input.split(splitter_between) + + confs = {} + + for kv in ky_pairs: + conf = kv.split(splitter_in) + key = conf[0].strip(" ") + if key == "objective" or key == "endpoint": + value = conf[1].strip("'") + ":" + conf[2].strip("'") + else: + value = conf[1] + + confs[key] = value + return confs + diff --git a/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_predict.yaml b/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_predict.yaml new file mode 100644 index 0000000000..7fa2c10cdc --- /dev/null +++ b/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_predict.yaml @@ -0,0 +1,42 @@ +apiVersion: kubeflow.org/v1 +kind: XGBoostJob +metadata: + name: xgboost-dist-iris-test-predict +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Predict + - --model_path=autoAI/xgb-opt/2 + - --model_storage_type=oss + - --oss_param=unknown + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Predict + - --model_path=autoAI/xgb-opt/2 + - --model_storage_type=oss + - --oss_param=unknown + + diff --git a/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_train.yaml b/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_train.yaml new file mode 100644 index 0000000000..44edcefe8f --- /dev/null +++ b/examples/xgboost/xgboost-dist/xgboostjob_v1_iris_train.yaml @@ -0,0 +1,44 @@ +apiVersion: kubeflow.org/v1 +kind: XGBoostJob +metadata: + name: xgboost-dist-iris-test-train +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Train + - --xgboost_parameter=objective:multi:softprob,num_class:3 + - --n_estimators=10 + - --learning_rate=0.1 + - --model_path=/tmp/xgboost-model + - --model_storage_type=local + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Train + - --xgboost_parameter="objective:multi:softprob,num_class:3" + - --n_estimators=10 + - --learning_rate=0.1 + + diff --git a/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_predict.yaml b/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_predict.yaml new file mode 100644 index 0000000000..6f86b823a5 --- /dev/null +++ b/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_predict.yaml @@ -0,0 +1,46 @@ +apiVersion: kubeflow.org/v1 +kind: XGBoostJob +metadata: + name: xgboost-dist-iris-test-predict +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Predict + - --model_path=autoAI/xgb-opt/2 + - --model_storage_type=oss + - --oss_param=unknown + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Predict + - --model_path=autoAI/xgb-opt/2 + - --model_storage_type=oss + - --oss_param=unknown + + diff --git a/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_predict_local.yaml b/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_predict_local.yaml new file mode 100644 index 0000000000..4d0851507e --- /dev/null +++ b/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_predict_local.yaml @@ -0,0 +1,56 @@ +apiVersion: kubeflow.org/v1 +kind: XGBoostJob +metadata: + name: xgboost-dist-iris-test-predict-local +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + apiVersion: v1 + kind: Pod + spec: + volumes: + - name: task-pv-storage + persistentVolumeClaim: + claimName: xgboostlocal + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + volumeMounts: + - name: task-pv-storage + mountPath: /tmp/xgboost_model + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Predict + - --model_path=/tmp/xgboost_model/2 + - --model_storage_type=local + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + apiVersion: v1 + kind: Pod + spec: + volumes: + - name: task-pv-storage + persistentVolumeClaim: + claimName: xgboostlocal + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + volumeMounts: + - name: task-pv-storage + mountPath: /tmp/xgboost_model + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Predict + - --model_path=/tmp/xgboost_model/2 + - --model_storage_type=local diff --git a/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_train.yaml b/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_train.yaml new file mode 100644 index 0000000000..3794f86436 --- /dev/null +++ b/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_train.yaml @@ -0,0 +1,49 @@ +apiVersion: kubeflow.org/v1 +kind: XGBoostJob +metadata: + name: xgboost-dist-iris-test-train +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Train + - --xgboost_parameter=objective:multi:softprob,num_class:3 + - --n_estimators=10 + - --learning_rate=0.1 + - --model_path=autoAI/xgb-opt/2 + - --model_storage_type=oss + - --oss_param=unknown + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Train + - --xgboost_parameter="objective:multi:softprob,num_class:3" + - --n_estimators=10 + - --learning_rate=0.1 + + diff --git a/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_train_local.yaml b/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_train_local.yaml new file mode 100644 index 0000000000..faf1fae0d3 --- /dev/null +++ b/examples/xgboost/xgboost-dist/xgboostjob_v1alpha1_iris_train_local.yaml @@ -0,0 +1,62 @@ +apiVersion: kubeflow.org/v1 +kind: XGBoostJob +metadata: + name: xgboost-dist-iris-test-train-local +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + apiVersion: v1 + kind: Pod + spec: + volumes: + - name: task-pv-storage + persistentVolumeClaim: + claimName: xgboostlocal + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + volumeMounts: + - name: task-pv-storage + mountPath: /tmp/xgboost_model + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Train + - --xgboost_parameter=objective:multi:softprob,num_class:3 + - --n_estimators=10 + - --learning_rate=0.1 + - --model_path=/tmp/xgboost_model/2 + - --model_storage_type=local + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + apiVersion: v1 + kind: Pod + spec: + volumes: + - name: task-pv-storage + persistentVolumeClaim: + claimName: xgboostlocal + containers: + - name: xgboostjob + image: docker.io/merlintang/xgboost-dist-iris:1.1 + volumeMounts: + - name: task-pv-storage + mountPath: /tmp/xgboost_model + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Train + - --xgboost_parameter="objective:multi:softprob,num_class:3" + - --n_estimators=10 + - --learning_rate=0.1 + - --model_path=/tmp/xgboost_model/2 + - --model_storage_type=local