Skip to content

Commit

Permalink
Update multi-task example (intel-analytics#5787)
Browse files Browse the repository at this point in the history
* update multi-task

* fix

* minor

* minor
  • Loading branch information
hkvision authored and ForJadeForest committed Sep 20, 2022
1 parent 3a18e5a commit 30f802d
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 45 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,3 @@ __pycache__
target
build
dist
apps/wide-deep-recommendation/model_training.ipynb
20 changes: 6 additions & 14 deletions python/friesian/example/multi_task/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The details of preprocessing can be found [here](https://github.com/intel-analyt
+-------------------+-----+--------+-------------------+-----------+-----+-------+----------+----------+----------+-------------+------+---+--------+----+---+------+-----+
```
Data preprocessing command:
- For Spark local mode
```bash
python data_processing.py \
--input_path /path/to/input/dataset \
Expand All @@ -54,6 +55,7 @@ python data_processing.py \
--executor_cores 8 \
--executor_memory 12g \
```
- For Spark yarn client mode
```bash
python data_processing.py \
--input_path /path/to/input/dataset \
Expand Down Expand Up @@ -82,6 +84,7 @@ When the *cluster_mode* is yarn, *input_path* and *output_path* should be HDFS p

## Train and test Multi-task models
After data preprocessing, the training command for MMoE or PLE model is as follows:
- For Spark local mode
```bash
python run_multi_task.py \
--model_type mmoe\
Expand All @@ -92,6 +95,7 @@ python run_multi_task.py \
--executor_cores 8 \
--executor_memory 12g \
```
- For Spark yarn client mode
```bash
python run_multi_task.py \
--model_type mmoe\
Expand All @@ -105,19 +109,7 @@ python run_multi_task.py \
--driver_cores 2 \
--driver_memory 8g
```
Evaluate Results as follows:
```bash
python run_multi_task.py \
--model_type mmoe\
--test_data_path /path/to/testing/dataset \
--model_save_path /path/to/save/the/trained/model \
--cluster_mode local \
--executor_cores 8 \
--executor_memory 12g \
--num_executors 4 \
--driver_cores 2 \
--driver_memory 8g
```

Results:
```angular2html
1. For MMoE:
Expand All @@ -143,7 +135,7 @@ validation_duration_mae 42.66642379760742
validation_click_auc 0.6481693387031555
```

__Options for training and test:__
__Options for train and test:__
* `model_type`: The multi task model, mmoe or ple. Default to be mmoe.
* `train_data_path`: The path to training dataset.
* `test_data_path`: The path to testing dataset.
Expand Down
40 changes: 16 additions & 24 deletions python/friesian/example/multi_task/data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse

import os
from argparse import ArgumentParser

from bigdl.friesian.feature import FeatureTable
from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.dllib.utils.log4Error import invalidInputError
from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.friesian.feature import FeatureTable


def transform(x):
# dealing with some abnormal data
def transform_img_num(x):
# deal with abnormal data of img_num
if x == '上海':
return 0.0
elif isinstance(x, float):
return float(x)
else:
else: # for string inputs
return float(eval(x))


Expand All @@ -39,17 +39,13 @@ def transform_cat_2(x):
def read_and_split(data_input_path, sparse_int_features, sparse_string_features, dense_features):
header_names = ['user_id', 'article_id', 'expo_time', 'net_status', 'flush_nums',
'exop_position', 'click', 'duration', 'device', 'os', 'province', 'city',
'age', 'gender', 'ctime', 'img_num', 'cat_1', 'cat_2'
]
'age', 'gender', 'ctime', 'img_num', 'cat_1', 'cat_2']
if data_input_path.endswith("csv"):
# data_pd = pd.read_csv(os.path.join(data_input_path, 'train_data.csv'), index_col=0,
# parse_dates=['expo_time'], low_memory=False)
# data_pd.to_csv('../train_data_new.csv', index=False, header=None)
tbl = FeatureTable.read_csv(data_input_path, header=False, names=header_names)
else:
tbl = FeatureTable.read_parquet(data_input_path)

print('The number of total data: ', tbl.size())
print('The number of total data: {}'.format(tbl.size()))

tbl = tbl.cast(sparse_int_features, 'string')
tbl = tbl.cast(dense_features, 'string')
Expand All @@ -59,23 +55,21 @@ def read_and_split(data_input_path, sparse_int_features, sparse_string_features,
tbl = tbl.fillna("", feature)
tbl = tbl.fillna('0.0', 'img_num')

process_img_num = lambda x: transform(x)
process_cat_2 = lambda x: transform_cat_2(x)
tbl = tbl.apply("img_num", "img_num", process_img_num, "float")
tbl = tbl.apply("cat_2", "cat_2", process_cat_2, "string")
tbl = tbl.apply("img_num", "img_num", transform_img_num, "float")
tbl = tbl.apply("cat_2", "cat_2", transform_cat_2, "string")

train_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] < '2021-07-06'])
valid_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] >= '2021-07-06'])
print('The number of train data: ', train_tbl.size())
print('The number of test data: ', valid_tbl.size())
print('The number of train data: {}'.format(train_tbl.size()))
print('The number of test data: {}'.format(valid_tbl.size()))
return train_tbl, valid_tbl


def feature_engineering(train_tbl, valid_tbl, output_path, sparse_int_features,
sparse_string_features, dense_features):
import json
train_tbl, min_max_dict = train_tbl.min_max_scale(dense_features)
valid_tbl = valid_tbl.transform_min_max_scale(dense_features, min_max_dict)
# TODO: fix the bug of cat_2
cat_cols = sparse_string_features[-1:] + sparse_int_features + sparse_string_features[:-1]
for feature in cat_cols:
train_tbl, feature_idx = train_tbl.category_encode(feature)
Expand Down Expand Up @@ -107,8 +101,8 @@ def parse_args():
help='The driver core number.')
parser.add_argument('--driver_memory', type=str, default="8g",
help='The driver memory.')
args_ = parser.parse_args()
return args_
args = parser.parse_args()
return args


if __name__ == '__main__':
Expand Down Expand Up @@ -136,10 +130,8 @@ def parse_args():
sparse_int_features = [
'user_id', 'article_id',
'net_status', 'flush_nums',
'exop_position',
'exop_position'
]
# put cat_2 at first bug
# put cat_1,cat_2 at first bug
sparse_string_features = [
'device', 'os', 'province',
'city', 'age',
Expand Down
12 changes: 6 additions & 6 deletions python/friesian/example/multi_task/run_multi_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import math
from time import time
from argparse import ArgumentParser, ArgumentError
from argparse import ArgumentParser
from keras.callbacks import EarlyStopping

from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca.learn.tf2.estimator import Estimator
from bigdl.friesian.feature import FeatureTable

from deepctr.feature_column import SparseFeat, DenseFeat
from deepctr.models import MMOE, PLE

from bigdl.dllib.utils.log4Error import invalidInputError
from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca.learn.tf2.estimator import Estimator
from bigdl.friesian.feature import FeatureTable


def build_model(model_type, sparse_features, dense_features, feature_max_idx):
Expand Down Expand Up @@ -213,7 +213,7 @@ def _parse_args():
'city',
'age',
'gender',
'cat_1',
'cat_1'
]
continuous_cols = ['img_num']
feature_max_idx = {'user_id': 40000, 'article_id': 200000, 'net_status': 1004,
Expand Down

0 comments on commit 30f802d

Please sign in to comment.