Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Xshards test to github actions #5512

Merged
merged 14 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions .github/workflows/nb-orca-tutorial-xshards.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Nightly Build Orca Tutorial Xshards

on:
#release:
# types: [published]
pull_request:
branches: [ "main" ]
paths:
- 'python/orca/src/bigdl/orca/data/**'
- 'python/orca/src/bigdl/orca/learn/metrics.py'
- 'python/orca/src/bigdl/orca/learn/util.py'
- 'python/orca/src/bigdl/orca/learn/ray_estimator.py'
- 'python/orca/src/bigdl/orca/learn/pytorch/**'
schedule:
- cron: '0 15 * * *'
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

jobs:
build:

runs-on: [ubuntu-20.04-lts]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runs-on: [self-hosted, Gondolin, ubuntu-20.04-lts]

permissions:
contents: read
packages: write

steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'temurin'
#server-id: github # Value of the distributionManagement/repository/id field of the pom.xml
settings-path: ${{ github.workspace }} # location for the settings.xml file

- name: Set up Maven
uses: stCarolas/[email protected]
with:
maven-version: 3.8.2

- name: Set up Maven Settings
uses: s4u/[email protected]
with:
sonatypeSnapshots: true
apacheSnapshots: true
servers: |
[{
"id": "central",
"configuration": {
"httpConfiguration": {
"all": {
"connectionTimeout": "3600000",
"readTimeout": "3600000"
}
}
}
}]
mirrors: '[{"id": "ardaNexus", "name": "ardaNexus", "mirrorOf": "*", "url": "${NEXUS_URL}" }]'

- name: Setup Env
run: |
apt-get update
apt-get install wget
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.7'
- name: Install dependencies
shell: bash
run: |
python -m pip install --upgrade pip
python -m pip install --upgrade setuptools==58.0.4
pip uninstall -y bigdl-friesian bigdl-friesian-spark3 bigdl-dllib bigdl-dllib-spark3 bigdl-orca pyspark bigdl-orca-spark3 bigdl-chronos bigdl-chronos-spark3 bigdl-friesian bigdl-friesian-spark3
pip install -i https://pypi.org/simple --pre --upgrade bigdl-orca-spark3
pip install numpy==1.18.5
- name: Run Test
#run: python -m build
run: |
export SPARK_LOCAL_HOSTNAME=localhost
export FTP_URI=ftp://zoo:[email protected]
chmod a+x python/orca/dev/test/run-tutorial-xshards.sh
python/orca/dev/test/run-tutorial-xshards.sh
env:
BIGDL_ROOT: ${{ github.workspace }}
29 changes: 29 additions & 0 deletions python/orca/dev/test/run-tutorial-xshards.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

export FTP_URI=$FTP_URI
set -ex

cd "`dirname $0`"

export PYSPARK_PYTHON=python
export PYSPARK_DRIVER_PYTHON=python

ray stop -f

cd ../../
echo "Running RayOnSpark tests"
python tutorial/xshards/tabular_playground_series.py --path './xshards/train.csv'
183 changes: 96 additions & 87 deletions python/orca/tutorial/xshards/tabular_playground_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# This example is adapted from
# https://www.kaggle.com/code/remekkinas/tps-5-pytorch-nn-for-tabular-step-by-step/notebook

import argparse

from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
Expand All @@ -27,101 +29,108 @@
from bigdl.orca.learn.pytorch import Estimator
from bigdl.orca.learn.metrics import Accuracy

init_orca_context(cluster_mode="local", cores=4, memory="3g")

# Load data
file_path = 'train.csv'
data_shard = bigdl.orca.data.pandas.read_csv(file_path)

# Drop duplicate columns
data_shard = data_shard.deduplicates()

# Labelencode y
def change_col_name(df):
df = df.rename(columns={'id': 'id0'})
return df
data_shard = data_shard.transform_shard(change_col_name)
encode = StringIndexer(inputCol='target')
data_shard = encode.fit_transform(data_shard)
def change_val(df):
df['target'] = df['target']-1
return df
data_shard = data_shard.transform_shard(change_val)

# Split train and test set
def split_train_test(data):
RANDOM_STATE = 2021
train, test = train_test_split(data, test_size=0.2, random_state=RANDOM_STATE)
return train, test
train_shard, val_shard = data_shard.transform_shard(split_train_test).split()

# Transform the feature columns
feature_list = []
for i in range(50):
feature_list.append('feature_' + str(i))
scale = MinMaxScaler(inputCol=feature_list, outputCol="x_scaled")
train_shard = scale.fit_transform(train_shard)
val_shard = scale.transform(val_shard)

# Change data types
def change_data_type(df):
df['x_scaled'] = df['x_scaled'].apply(lambda x: np.array(x, dtype=np.float32))
df['target'] = df['target'].apply(lambda x: np.long(x))
return df
train_shard = train_shard.transform_shard(change_data_type)
val_shard = val_shard.transform_shard(change_data_type)

# Model
torch.manual_seed(0)
BATCH_SIZE = 64
NUM_CLASSES = 4
NUM_EPOCHS = 1
NUM_FEATURES = 50

def linear_block(in_features, out_features, p_drop, *args, **kwargs):
return nn.Sequential(
nn.Linear(in_features, out_features),
nn.ReLU(),
nn.Dropout(p=p_drop)
)

class TPS05ClassificationSeq(nn.Module):
def __init__(self):
super(TPS05ClassificationSeq, self).__init__()
num_feature = NUM_FEATURES
num_class = NUM_CLASSES
self.linear = nn.Sequential(
linear_block(num_feature, 100, 0.3),
linear_block(100, 250, 0.3),
linear_block(250, 128, 0.3),
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--path', type=str,
default="./xshards/train.csv",
help='Training data path')
opt = parser.parse_args()

init_orca_context(cluster_mode="local", cores=4, memory="3g")

# Load data
# file_path = 'train.csv'
data_shard = bigdl.orca.data.pandas.read_csv(opt.path)

# Drop duplicate columns
data_shard = data_shard.deduplicates()

# Labelencode y
def change_col_name(df):
df = df.rename(columns={'id': 'id0'})
return df
data_shard = data_shard.transform_shard(change_col_name)
encode = StringIndexer(inputCol='target')
data_shard = encode.fit_transform(data_shard)
def change_val(df):
df['target'] = df['target']-1
return df
data_shard = data_shard.transform_shard(change_val)

# Split train and test set
def split_train_test(data):
RANDOM_STATE = 2021
train, test = train_test_split(data, test_size=0.2, random_state=RANDOM_STATE)
return train, test
train_shard, val_shard = data_shard.transform_shard(split_train_test).split()

# Transform the feature columns
feature_list = []
for i in range(50):
feature_list.append('feature_' + str(i))
scale = MinMaxScaler(inputCol=feature_list, outputCol="x_scaled")
train_shard = scale.fit_transform(train_shard)
val_shard = scale.transform(val_shard)

# Change data types
def change_data_type(df):
df['x_scaled'] = df['x_scaled'].apply(lambda x: np.array(x, dtype=np.float32))
df['target'] = df['target'].apply(lambda x: np.long(x))
return df
train_shard = train_shard.transform_shard(change_data_type)
val_shard = val_shard.transform_shard(change_data_type)

# Model
torch.manual_seed(0)
BATCH_SIZE = 64
NUM_CLASSES = 4
NUM_EPOCHS = 1
NUM_FEATURES = 50

def linear_block(in_features, out_features, p_drop, *args, **kwargs):
return nn.Sequential(
nn.Linear(in_features, out_features),
nn.ReLU(),
nn.Dropout(p=p_drop)
)

self.out = nn.Sequential(
nn.Linear(128, num_class)
)
class TPS05ClassificationSeq(nn.Module):
def __init__(self):
super(TPS05ClassificationSeq, self).__init__()
num_feature = NUM_FEATURES
num_class = NUM_CLASSES
self.linear = nn.Sequential(
linear_block(num_feature, 100, 0.3),
linear_block(100, 250, 0.3),
linear_block(250, 128, 0.3),
)

self.out = nn.Sequential(
nn.Linear(128, num_class)
)

def forward(self, x):
x = self.linear(x)
return self.out(x)
def forward(self, x):
x = self.linear(x)
return self.out(x)

def model_creator(config):
model = TPS05ClassificationSeq()
return model
def model_creator(config):
model = TPS05ClassificationSeq()
return model

def optim_creator(model, config):
return optim.Adam(model.parameters(), lr=0.001)
def optim_creator(model, config):
return optim.Adam(model.parameters(), lr=0.001)

criterion = nn.CrossEntropyLoss()
criterion = nn.CrossEntropyLoss()

est = Estimator.from_torch(model=model_creator, optimizer=optim_creator,
loss=criterion, metrics=[Accuracy()], backend="ray")
est = Estimator.from_torch(model=model_creator, optimizer=optim_creator,
loss=criterion, metrics=[Accuracy()], backend="ray")

est.fit(data=train_shard, feature_cols=['x_scaled'], label_cols=['target'],
validation_data=val_shard, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE)
est.fit(data=train_shard, feature_cols=['x_scaled'], label_cols=['target'],
validation_data=val_shard, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE)

result = est.evaluate(data=val_shard, feature_cols=['x_scaled'], label_cols=['target'], batch_size=BATCH_SIZE)
result = est.evaluate(data=val_shard, feature_cols=['x_scaled'], label_cols=['target'], batch_size=BATCH_SIZE)

for r in result:
print(r, ":", result[r])
for r in result:
print(r, ":", result[r])

stop_orca_context()
stop_orca_context()