diff --git a/.gitignore b/.gitignore index ef42fd34a68..8be49ffa06d 100644 --- a/.gitignore +++ b/.gitignore @@ -48,4 +48,3 @@ __pycache__ target build dist -apps/wide-deep-recommendation/model_training.ipynb diff --git a/python/friesian/example/multi_task/README.md b/python/friesian/example/multi_task/README.md index 3f65b13d5e4..b8bc690d49b 100644 --- a/python/friesian/example/multi_task/README.md +++ b/python/friesian/example/multi_task/README.md @@ -46,6 +46,7 @@ The details of preprocessing can be found [here](https://github.com/intel-analyt +-------------------+-----+--------+-------------------+-----------+-----+-------+----------+----------+----------+-------------+------+---+--------+----+---+------+-----+ ``` Data preprocessing command: +- For Spark local mode ```bash python data_processing.py \ --input_path /path/to/input/dataset \ @@ -54,6 +55,7 @@ python data_processing.py \ --executor_cores 8 \ --executor_memory 12g \ ``` +- For Spark yarn client mode ```bash python data_processing.py \ --input_path /path/to/input/dataset \ @@ -82,6 +84,7 @@ When the *cluster_mode* is yarn, *input_path* and *output_path* should be HDFS p ## Train and test Multi-task models After data preprocessing, the training command for MMoE or PLE model is as follows: +- For Spark local mode ```bash python run_multi_task.py \ --model_type mmoe\ @@ -92,6 +95,7 @@ python run_multi_task.py \ --executor_cores 8 \ --executor_memory 12g \ ``` +- For Spark yarn client mode ```bash python run_multi_task.py \ --model_type mmoe\ @@ -105,19 +109,7 @@ python run_multi_task.py \ --driver_cores 2 \ --driver_memory 8g ``` -Evaluate Results as follows: -```bash -python run_multi_task.py \ - --model_type mmoe\ - --test_data_path /path/to/testing/dataset \ - --model_save_path /path/to/save/the/trained/model \ - --cluster_mode local \ - --executor_cores 8 \ - --executor_memory 12g \ - --num_executors 4 \ - --driver_cores 2 \ - --driver_memory 8g -``` + Results: ```angular2html 1. For MMoE: @@ -143,7 +135,7 @@ validation_duration_mae 42.66642379760742 validation_click_auc 0.6481693387031555 ``` -__Options for training and test:__ +__Options for train and test:__ * `model_type`: The multi task model, mmoe or ple. Default to be mmoe. * `train_data_path`: The path to training dataset. * `test_data_path`: The path to testing dataset. diff --git a/python/friesian/example/multi_task/data_processing.py b/python/friesian/example/multi_task/data_processing.py index c8f27f5b1e5..8d146da3814 100644 --- a/python/friesian/example/multi_task/data_processing.py +++ b/python/friesian/example/multi_task/data_processing.py @@ -13,22 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import argparse + import os from argparse import ArgumentParser -from bigdl.friesian.feature import FeatureTable -from bigdl.orca import init_orca_context, stop_orca_context from bigdl.dllib.utils.log4Error import invalidInputError +from bigdl.orca import init_orca_context, stop_orca_context +from bigdl.friesian.feature import FeatureTable -def transform(x): - # dealing with some abnormal data +def transform_img_num(x): + # deal with abnormal data of img_num if x == '上海': return 0.0 elif isinstance(x, float): return float(x) - else: + else: # for string inputs return float(eval(x)) @@ -39,17 +39,13 @@ def transform_cat_2(x): def read_and_split(data_input_path, sparse_int_features, sparse_string_features, dense_features): header_names = ['user_id', 'article_id', 'expo_time', 'net_status', 'flush_nums', 'exop_position', 'click', 'duration', 'device', 'os', 'province', 'city', - 'age', 'gender', 'ctime', 'img_num', 'cat_1', 'cat_2' - ] + 'age', 'gender', 'ctime', 'img_num', 'cat_1', 'cat_2'] if data_input_path.endswith("csv"): - # data_pd = pd.read_csv(os.path.join(data_input_path, 'train_data.csv'), index_col=0, - # parse_dates=['expo_time'], low_memory=False) - # data_pd.to_csv('../train_data_new.csv', index=False, header=None) tbl = FeatureTable.read_csv(data_input_path, header=False, names=header_names) else: tbl = FeatureTable.read_parquet(data_input_path) - print('The number of total data: ', tbl.size()) + print('The number of total data: {}'.format(tbl.size())) tbl = tbl.cast(sparse_int_features, 'string') tbl = tbl.cast(dense_features, 'string') @@ -59,23 +55,21 @@ def read_and_split(data_input_path, sparse_int_features, sparse_string_features, tbl = tbl.fillna("", feature) tbl = tbl.fillna('0.0', 'img_num') - process_img_num = lambda x: transform(x) - process_cat_2 = lambda x: transform_cat_2(x) - tbl = tbl.apply("img_num", "img_num", process_img_num, "float") - tbl = tbl.apply("cat_2", "cat_2", process_cat_2, "string") + tbl = tbl.apply("img_num", "img_num", transform_img_num, "float") + tbl = tbl.apply("cat_2", "cat_2", transform_cat_2, "string") train_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] < '2021-07-06']) valid_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] >= '2021-07-06']) - print('The number of train data: ', train_tbl.size()) - print('The number of test data: ', valid_tbl.size()) + print('The number of train data: {}'.format(train_tbl.size())) + print('The number of test data: {}'.format(valid_tbl.size())) return train_tbl, valid_tbl def feature_engineering(train_tbl, valid_tbl, output_path, sparse_int_features, sparse_string_features, dense_features): - import json train_tbl, min_max_dict = train_tbl.min_max_scale(dense_features) valid_tbl = valid_tbl.transform_min_max_scale(dense_features, min_max_dict) + # TODO: fix the bug of cat_2 cat_cols = sparse_string_features[-1:] + sparse_int_features + sparse_string_features[:-1] for feature in cat_cols: train_tbl, feature_idx = train_tbl.category_encode(feature) @@ -107,8 +101,8 @@ def parse_args(): help='The driver core number.') parser.add_argument('--driver_memory', type=str, default="8g", help='The driver memory.') - args_ = parser.parse_args() - return args_ + args = parser.parse_args() + return args if __name__ == '__main__': @@ -136,10 +130,8 @@ def parse_args(): sparse_int_features = [ 'user_id', 'article_id', 'net_status', 'flush_nums', - 'exop_position', + 'exop_position' ] - # put cat_2 at first bug - # put cat_1,cat_2 at first bug sparse_string_features = [ 'device', 'os', 'province', 'city', 'age', diff --git a/python/friesian/example/multi_task/run_multi_task.py b/python/friesian/example/multi_task/run_multi_task.py index c0577c7abcd..ff2b38af2c6 100644 --- a/python/friesian/example/multi_task/run_multi_task.py +++ b/python/friesian/example/multi_task/run_multi_task.py @@ -13,19 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. # + import math from time import time -from argparse import ArgumentParser, ArgumentError +from argparse import ArgumentParser from keras.callbacks import EarlyStopping -from bigdl.orca import init_orca_context, stop_orca_context -from bigdl.orca.learn.tf2.estimator import Estimator -from bigdl.friesian.feature import FeatureTable - from deepctr.feature_column import SparseFeat, DenseFeat from deepctr.models import MMOE, PLE from bigdl.dllib.utils.log4Error import invalidInputError +from bigdl.orca import init_orca_context, stop_orca_context +from bigdl.orca.learn.tf2.estimator import Estimator +from bigdl.friesian.feature import FeatureTable def build_model(model_type, sparse_features, dense_features, feature_max_idx): @@ -213,7 +213,7 @@ def _parse_args(): 'city', 'age', 'gender', - 'cat_1', + 'cat_1' ] continuous_cols = ['img_num'] feature_max_idx = {'user_id': 40000, 'article_id': 200000, 'net_status': 1004,