Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update multi-task example #5787

Merged
merged 4 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,3 @@ __pycache__
target
build
dist
apps/wide-deep-recommendation/model_training.ipynb
20 changes: 6 additions & 14 deletions python/friesian/example/multi_task/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The details of preprocessing can be found [here](https://github.com/intel-analyt
+-------------------+-----+--------+-------------------+-----------+-----+-------+----------+----------+----------+-------------+------+---+--------+----+---+------+-----+
```
Data preprocessing command:
- For Spark local mode
```bash
python data_processing.py \
--input_path /path/to/input/dataset \
Expand All @@ -54,6 +55,7 @@ python data_processing.py \
--executor_cores 8 \
--executor_memory 12g \
```
- For Spark yarn client mode
```bash
python data_processing.py \
--input_path /path/to/input/dataset \
Expand Down Expand Up @@ -82,6 +84,7 @@ When the *cluster_mode* is yarn, *input_path* and *output_path* should be HDFS p

## Train and test Multi-task models
After data preprocessing, the training command for MMoE or PLE model is as follows:
- For Spark local mode
```bash
python run_multi_task.py \
--model_type mmoe\
Expand All @@ -92,6 +95,7 @@ python run_multi_task.py \
--executor_cores 8 \
--executor_memory 12g \
```
- For Spark yarn client mode
```bash
python run_multi_task.py \
--model_type mmoe\
Expand All @@ -105,19 +109,7 @@ python run_multi_task.py \
--driver_cores 2 \
--driver_memory 8g
```
Evaluate Results as follows:
```bash
python run_multi_task.py \
--model_type mmoe\
--test_data_path /path/to/testing/dataset \
--model_save_path /path/to/save/the/trained/model \
--cluster_mode local \
--executor_cores 8 \
--executor_memory 12g \
--num_executors 4 \
--driver_cores 2 \
--driver_memory 8g
```

Results:
```angular2html
1. For MMoE:
Expand All @@ -143,7 +135,7 @@ validation_duration_mae 42.66642379760742
validation_click_auc 0.6481693387031555
```

__Options for training and test:__
__Options for train and test:__
* `model_type`: The multi task model, mmoe or ple. Default to be mmoe.
* `train_data_path`: The path to training dataset.
* `test_data_path`: The path to testing dataset.
Expand Down
40 changes: 16 additions & 24 deletions python/friesian/example/multi_task/data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse

import os
from argparse import ArgumentParser

from bigdl.friesian.feature import FeatureTable
from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.dllib.utils.log4Error import invalidInputError
from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.friesian.feature import FeatureTable


def transform(x):
# dealing with some abnormal data
def transform_img_num(x):
# deal with abnormal data of img_num
if x == '上海':
return 0.0
elif isinstance(x, float):
return float(x)
else:
else: # for string inputs
return float(eval(x))


Expand All @@ -39,17 +39,13 @@ def transform_cat_2(x):
def read_and_split(data_input_path, sparse_int_features, sparse_string_features, dense_features):
header_names = ['user_id', 'article_id', 'expo_time', 'net_status', 'flush_nums',
'exop_position', 'click', 'duration', 'device', 'os', 'province', 'city',
'age', 'gender', 'ctime', 'img_num', 'cat_1', 'cat_2'
]
'age', 'gender', 'ctime', 'img_num', 'cat_1', 'cat_2']
if data_input_path.endswith("csv"):
# data_pd = pd.read_csv(os.path.join(data_input_path, 'train_data.csv'), index_col=0,
# parse_dates=['expo_time'], low_memory=False)
# data_pd.to_csv('../train_data_new.csv', index=False, header=None)
tbl = FeatureTable.read_csv(data_input_path, header=False, names=header_names)
else:
tbl = FeatureTable.read_parquet(data_input_path)

print('The number of total data: ', tbl.size())
print('The number of total data: {}'.format(tbl.size()))

tbl = tbl.cast(sparse_int_features, 'string')
tbl = tbl.cast(dense_features, 'string')
Expand All @@ -59,23 +55,21 @@ def read_and_split(data_input_path, sparse_int_features, sparse_string_features,
tbl = tbl.fillna("", feature)
tbl = tbl.fillna('0.0', 'img_num')

process_img_num = lambda x: transform(x)
process_cat_2 = lambda x: transform_cat_2(x)
tbl = tbl.apply("img_num", "img_num", process_img_num, "float")
tbl = tbl.apply("cat_2", "cat_2", process_cat_2, "string")
tbl = tbl.apply("img_num", "img_num", transform_img_num, "float")
tbl = tbl.apply("cat_2", "cat_2", transform_cat_2, "string")

train_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] < '2021-07-06'])
valid_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] >= '2021-07-06'])
print('The number of train data: ', train_tbl.size())
print('The number of test data: ', valid_tbl.size())
print('The number of train data: {}'.format(train_tbl.size()))
print('The number of test data: {}'.format(valid_tbl.size()))
return train_tbl, valid_tbl


def feature_engineering(train_tbl, valid_tbl, output_path, sparse_int_features,
sparse_string_features, dense_features):
import json
train_tbl, min_max_dict = train_tbl.min_max_scale(dense_features)
valid_tbl = valid_tbl.transform_min_max_scale(dense_features, min_max_dict)
# TODO: fix the bug of cat_2
cat_cols = sparse_string_features[-1:] + sparse_int_features + sparse_string_features[:-1]
for feature in cat_cols:
train_tbl, feature_idx = train_tbl.category_encode(feature)
Expand Down Expand Up @@ -107,8 +101,8 @@ def parse_args():
help='The driver core number.')
parser.add_argument('--driver_memory', type=str, default="8g",
help='The driver memory.')
args_ = parser.parse_args()
return args_
args = parser.parse_args()
return args


if __name__ == '__main__':
Expand Down Expand Up @@ -136,10 +130,8 @@ def parse_args():
sparse_int_features = [
'user_id', 'article_id',
'net_status', 'flush_nums',
'exop_position',
'exop_position'
]
# put cat_2 at first bug
# put cat_1,cat_2 at first bug
sparse_string_features = [
'device', 'os', 'province',
'city', 'age',
Expand Down
12 changes: 6 additions & 6 deletions python/friesian/example/multi_task/run_multi_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import math
from time import time
from argparse import ArgumentParser, ArgumentError
from argparse import ArgumentParser
from keras.callbacks import EarlyStopping

from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca.learn.tf2.estimator import Estimator
from bigdl.friesian.feature import FeatureTable

from deepctr.feature_column import SparseFeat, DenseFeat
from deepctr.models import MMOE, PLE

from bigdl.dllib.utils.log4Error import invalidInputError
from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca.learn.tf2.estimator import Estimator
from bigdl.friesian.feature import FeatureTable


def build_model(model_type, sparse_features, dense_features, feature_max_idx):
Expand Down Expand Up @@ -213,7 +213,7 @@ def _parse_args():
'city',
'age',
'gender',
'cat_1',
'cat_1'
]
continuous_cols = ['img_num']
feature_max_idx = {'user_id': 40000, 'article_id': 200000, 'net_status': 1004,
Expand Down