From 9587a04325589525b3d6eb7de598ef7daf37277e Mon Sep 17 00:00:00 2001 From: WangBin <32730386+devWangBin@users.noreply.github.com> Date: Thu, 18 Aug 2022 16:17:27 +0800 Subject: [PATCH 1/8] add multi task example codes and readme --- python/friesian/example/multi_task/README.md | 123 +++++++++ .../example/multi_task/data_processing.py | 158 +++++++++++ .../example/multi_task/run_multi_task.py | 246 ++++++++++++++++++ 3 files changed, 527 insertions(+) create mode 100644 python/friesian/example/multi_task/README.md create mode 100644 python/friesian/example/multi_task/data_processing.py create mode 100644 python/friesian/example/multi_task/run_multi_task.py diff --git a/python/friesian/example/multi_task/README.md b/python/friesian/example/multi_task/README.md new file mode 100644 index 00000000000..3cb5fc547ed --- /dev/null +++ b/python/friesian/example/multi_task/README.md @@ -0,0 +1,123 @@ +# Multi-task Recommendation with BigDL +In addition to providing a personalized recommendation, recommendation systems need to output diverse +predictions to meet the needs of real-world applications, such as user click-through rates and browsing (or watching) time predictions for products. +This example demonstrates how to use the [MMoE](https://dl.acm.org/doi/pdf/10.1145/3219819.3220007) or [PLE](https://dl.acm.org/doi/pdf/10.1145/3383313.3412236?casa_token=8fchWD8CHc0AAAAA:2cyP8EwkhIUlSFPRpfCGHahTddki0OEjDxfbUFMkXY5fU0FNtkvRzmYloJtLowFmL1en88FRFY4Q) model to implement multi-task recommendations with large-scale data. + +## Prepare environments +We highly recommend you use [Anaconda](https://www.anaconda.com/distribution/#linux) to prepare the environment, especially if you want to run on a yarn cluster. +``` +conda create -n bigdl python=3.7 #bigdl is conda environment name, you can set another name you like. +conda activate bigdl +pip install bigdl-orca[ray] +pip install bigdl-friesian +pip install tensorflow==2.9.1 +pip install deepctr[cpu] +``` +Refer to [this document](https://bigdl.readthedocs.io/en/latest/doc/UserGuide/python.html#install) for more installation guides. + +## Data Preparation +In this example, a news dataset is used to demonstrate the training and testing process. +Each row contains several feature values, timestamps and two labels. Using the timestamp to divide the training and testing sets. +The click prediction (classification) and duration time prediction (regression) are two output targets. Original data examples are as follows: +```angular2html ++----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+----+------+------+-------------------+-------+-----+-------------+ +| user_id|article_id| expo_time|net_status|flush_nums|exop_position|click|duration|device| os|province|city| age|gender| ctime|img_num|cat_1| cat_2| ++----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+----+------+------+-------------------+-------+-----+-------------+ +|1000541010| 464467760|2021-06-30 09:57:14| 2| 0| 13| 1| 28|V2054A|Android| 上海|上海|A_0_24|female|2021-06-29 14:46:43| 3| 娱乐|娱乐/港台明星| +|1000541010| 463850913|2021-06-30 09:57:14| 2| 0| 15| 0| 0|V2054A|Android| 上海|上海|A_0_24|female|2021-06-27 22:29:13| 11| 时尚|时尚/女性时尚| +|1000541010| 464022440|2021-06-30 09:57:14| 2| 0| 17| 0| 0|V2054A|Android| 上海|上海|A_0_24|female|2021-06-28 12:22:54| 7| 农村|农村/农业资讯| +|1000541010| 464586545|2021-06-30 09:58:31| 2| 1| 20| 0| 0|V2054A|Android| 上海|上海|A_0_24|female|2021-06-29 13:25:06| 5| 娱乐|娱乐/港台明星| +|1000541010| 465352885|2021-07-03 18:13:03| 5| 0| 18| 0| 0|V2054A|Android| 上海|上海|A_0_24|female|2021-07-02 10:43:51| 18| 娱乐|娱乐/港台明星| ++----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+----+------+------+-------------------+-------+-----+-------------+ +``` + +With the built-in high-level preprocessing operations in FeatureTable, we can easily perform distributed pre-processing for large-scale data. +The details of pre-processing can be found [here](https://github.com/intel-analytics/BigDL/blob/main/apps/wide-deep-recommendation/feature_engineering.ipynb). Examples of processed data are as follows: + +```angular2html ++-------------------+-----+--------+-------------------+-----------+-----+-------+----------+----------+----------+-------------+------+---+--------+----+---+------+-----+ +| expo_time|click|duration| ctime| img_num|cat_2|user_id|article_id|net_status|flush_nums|exop_position|device| os|province|city|age|gender|cat_1| ++-------------------+-----+--------+-------------------+-----------+-----+-------+----------+----------+----------+-------------+------+---+--------+----+---+------+-----+ +|2021-06-30 09:57:14| 1| 28|2021-06-29 14:46:43|0.016574586| 60| 14089| 87717| 4| 73| 1003| 36| 2| 38| 308| 5| 1| 5| +|2021-06-30 09:57:14| 0| 0|2021-06-27 22:29:13| 0.06077348| 47| 14089| 35684| 4| 73| 43| 36| 2| 38| 308| 5| 1| 32| +|2021-06-30 09:57:14| 0| 0|2021-06-28 12:22:54|0.038674034| 157| 14089| 20413| 4| 73| 363| 36| 2| 38| 308| 5| 1| 20| +|2021-06-30 09:58:31| 0| 0|2021-06-29 13:25:06|0.027624309| 60| 14089| 15410| 4| 312| 848| 36| 2| 38| 308| 5| 1| 5| +|2021-07-03 18:13:03| 0| 0|2021-07-02 10:43:51| 0.09944751| 60| 14089| 81707| 2| 73| 313| 36| 2| 38| 308| 5| 1| 5| ++-------------------+-----+--------+-------------------+-----------+-----+-------+----------+----------+----------+-------------+------+---+--------+----+---+------+-----+ +``` +Data pre-processing command: +```bash +python data_processing.py \ + --input_path path/to/input/dataset \ + --output_path path/to/save/processed/dataset \ + --cluster_mode local \ + --executor_cores 8 \ + --executor_memory 24g \ + --num_executors 4 \ + --driver_cores 2 \ + --driver_memory 24g +``` + +__Options for data_processing:__ +* `input_path`: The path to input dataset. +* `output_path`: The path to save processed dataset. +* `cluster_mode`: The cluster mode, such as local, yarn, standalone or spark-submit. Default to be local. +* `master`: The master url, only used when cluster mode is standalone. Default to be None. +* `executor_cores`: The executor core number. Default to be 8. +* `executor_memory`: The executor memory. Default to be 24g. +* `num_executors`: The number of executors. Default to be 4. +* `driver_cores`: The driver core number. Default to be 2. +* `driver_memory`: The driver memory. Default to be 24g. + +__NOTE:__ +When the *cluster_mode* is yarn, *input_path* and *output_path* can be HDFS paths. + +## Train and test Multi-task models +After data preprocessing, training MMoE or PlE model as follows: +```bash +python run_multi_task.py \ + --do_train \ + --model_type mmoe\ + --train_data_path path/to/training/dataset \ + --test_data_path path/to/testing/dataset \ + --model_save_path path/to/save/the/trained/model \ + --cluster_mode local \ + --executor_cores 8 \ + --executor_memory 24g \ + --num_executors 4 \ + --driver_cores 2 \ + --driver_memory 24g +``` + +Evaluate Results as follows: +```bash +python run_multi_task.py \ + --do_test \ + --model_type mmoe\ + --test_data_path path/to/testing/dataset \ + --model_save_path path/to/save/the/trained/model \ + --cluster_mode local \ + --executor_cores 8 \ + --executor_memory 24g \ + --num_executors 4 \ + --driver_cores 2 \ + --driver_memory 24g +``` + +__Options for data_processing:__ +* `do_train`: To start training model. +* `do_test`: To start test model. +* `model_type`: The multi task model, mmoe or ple. Default to be mmoe. +* `train_data_path`: The path to training dataset. +* `test_data_path`: The path to testing dataset. +* `model_save_path`: The path to save model. +* `cluster_mode`: The cluster mode, such as local, yarn, standalone or spark-submit. Default to be local. +* `master`: The master url, only used when cluster mode is standalone. Default to be None. +* `executor_cores`: The executor core number. Default to be 8. +* `executor_memory`: The executor memory. Default to be 24g. +* `num_executors`: The number of executors. Default to be 4. +* `driver_cores`: The driver core number. Default to be 2. +* `driver_memory`: The driver memory. Default to be 24g. + +__NOTE:__ +When the *cluster_mode* is yarn, *train_data_path*, *test_data_path* ans *model_save_path* can be HDFS paths. diff --git a/python/friesian/example/multi_task/data_processing.py b/python/friesian/example/multi_task/data_processing.py new file mode 100644 index 00000000000..6d7540d78ee --- /dev/null +++ b/python/friesian/example/multi_task/data_processing.py @@ -0,0 +1,158 @@ +# +# Copyright 2016 The BigDL Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import argparse +import os +from argparse import ArgumentParser +from bigdl.friesian.feature import FeatureTable +from bigdl.orca import init_orca_context, stop_orca_context + + +def transform(x): + if x == '上海': + return 0.0 + elif isinstance(x, float): + return float(x) + else: + return float(eval(x)) + + +def transform_cat_2(x): + return '-'.join(sorted(x.split('/'))) + + +def read_and_split(data_input_path, sparse_int_features, sparse_string_features, dense_features): + header_names = ['user_id', 'article_id', 'expo_time', 'net_status', 'flush_nums', + 'exop_position', 'click', 'duration', 'device', 'os', 'province', 'city', + 'age', 'gender', 'ctime', 'img_num', 'cat_1', 'cat_2' + ] + if data_input_path.split('.')[-1] == 'csv': + data_pd = FeatureTable.read_csv(data_input_path, header=False, names=header_names) + else: + data_pd = FeatureTable.read_parquet(data_input_path) + data_pd = data_pd.cast(sparse_int_features, 'string') + data_pd = data_pd.cast(dense_features, 'string') + + # fill absence data + for feature in (sparse_int_features + sparse_string_features): + data_pd = data_pd.fillna("", feature) + for dense_feature in dense_features: + data_pd = data_pd.fillna('0.0', dense_feature) + print(data_pd.df.dtypes) + + process_img_num = lambda x: transform(x) + process_cat_2 = lambda x: transform_cat_2(x) + data_pd = data_pd.apply("img_num", "img_num", process_img_num, "float") + data_pd = data_pd.apply("cat_2", "cat_2", process_cat_2, "string") + + train_tbl = FeatureTable(data_pd.df[data_pd.df['expo_time'] < '2021-07-06']) + valid_tbl = FeatureTable(data_pd.df[data_pd.df['expo_time'] >= '2021-07-06']) + print('train_data.shape: ', train_tbl.size()) + print('test_data.shape: ', valid_tbl.size()) + return train_tbl, valid_tbl + + +def feature_engineering(train_tbl, valid_tbl, model_path, model_path_json, sparse_int_features, + sparse_string_features, dense_features): + import json + train_tbl, min_max_dict = train_tbl.min_max_scale(dense_features) + valid_tbl = valid_tbl.transform_min_max_scale(dense_features, min_max_dict) + cat_cols = sparse_string_features[-1:] + sparse_int_features + sparse_string_features[:-1] + for feature in cat_cols: + train_tbl, feature_idx = train_tbl.category_encode(feature) + valid_tbl = valid_tbl.encode_string(feature, feature_idx) + valid_tbl = valid_tbl.fillna(0, feature) + print("The class number of feature: {}/{}".format(feature, feature_idx.size())) + feature_idx.write_parquet(model_path) + fea_dict = feature_idx.to_dict() + with open(model_path_json + "/" + feature + '.json', 'w', encoding='utf-8') as ff: + ff.write(json.dumps(fea_dict, ensure_ascii=False, indent=2)) + return train_tbl, valid_tbl + + +def _parse_args(): + parser = ArgumentParser(description="Transform dataset for multi task demo") + parser.add_argument('--input_path', type=str, + default='/path/to/input/dataset', + help='The path for input dataset') + parser.add_argument('--output_path', type=str, default='/path/to/save/processed/dataset', + help='The path for output dataset') + parser.add_argument('--cluster_mode', type=str, default="local", + help='The cluster mode, such as local, yarn, standalone or spark-submit.') + parser.add_argument('--master', type=str, default=None, + help='The master url, only used when cluster mode is standalone.') + parser.add_argument('--executor_cores', type=int, default=8, + help='The executor core number.') + parser.add_argument('--executor_memory', type=str, default="24g", + help='The executor memory.') + parser.add_argument('--num_executors', type=int, default=4, + help='The number of executors.') + parser.add_argument('--driver_cores', type=int, default=2, + help='The driver core number.') + parser.add_argument('--driver_memory', type=str, default="24g", + help='The driver memory.') + args_ = parser.parse_args() + return args_ + + +if __name__ == '__main__': + args = _parse_args() + if args.cluster_mode == "local": + sc = init_orca_context("local", cores=args.executor_cores, + memory=args.executor_memory) + elif args.cluster_mode == "standalone": + sc = init_orca_context("standalone", master=args.master, + cores=args.executor_cores, num_nodes=args.num_executors, + memory=args.executor_memory, + driver_cores=args.driver_cores, + driver_memory=args.driver_memory) + elif args.cluster_mode == "yarn": + sc = init_orca_context("yarn-client", cores=args.executor_cores, + num_nodes=args.num_executors, memory=args.executor_memory, + driver_cores=args.driver_cores, driver_memory=args.driver_memory) + elif args.cluster_mode == "spark-submit": + sc = init_orca_context("spark-submit") + else: + argparse.ArgumentError(False, + "cluster_mode should be one of 'local', 'yarn', 'standalone' and" + " 'spark-submit', but got " + args.cluster_mode) + + sparse_int_features_ = [ + 'user_id', 'article_id', + 'net_status', 'flush_nums', + 'exop_position', + ] + sparse_string_features_ = [ + 'device', 'os', 'province', + 'city', 'age', + 'gender', 'cat_1', 'cat_2' + ] + dense_features_ = ['img_num'] + model_path_ = os.path.join(args.output_path, 'feature_maps') + model_path_json_ = os.path.join(args.output_path, 'feature_maps_json') + os.makedirs(model_path_, exist_ok=True) + os.makedirs(model_path_json_, exist_ok=True) + # read, reformat and split data + df_train, df_test = read_and_split(args.input_path, sparse_int_features_, + sparse_string_features_, dense_features_) + train_tbl_, valid_tbl_ = feature_engineering(df_train, df_test, + model_path_, model_path_json_, + sparse_int_features_, + sparse_string_features_, dense_features_) + print(train_tbl_.size()) + print(valid_tbl_.size()) + train_tbl_.write_parquet(os.path.join(args.output_path, 'train_processed')) + valid_tbl_.write_parquet(os.path.join(args.output_path, 'test_processed')) + stop_orca_context() diff --git a/python/friesian/example/multi_task/run_multi_task.py b/python/friesian/example/multi_task/run_multi_task.py new file mode 100644 index 00000000000..3a6c8022986 --- /dev/null +++ b/python/friesian/example/multi_task/run_multi_task.py @@ -0,0 +1,246 @@ +# +# Copyright 2016 The BigDL Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import math +from time import time +from argparse import ArgumentParser, ArgumentError +from keras.callbacks import EarlyStopping + +from bigdl.orca import init_orca_context, stop_orca_context +from bigdl.orca.learn.tf2.estimator import Estimator +from bigdl.friesian.feature import FeatureTable + +from deepctr.feature_column import SparseFeat, DenseFeat +from deepctr.models import MMOE, PLE + + +def build_model(model_type, sparse_features, dense_features, feature_max_idx): + sparse_feature_columns = [SparseFeat(feat, feature_max_idx[feat], + embedding_dim='auto') for feat in sparse_features] + dense_feature_columns = [DenseFeat(feat, 1) for feat in dense_features] + dnn_features_columns = sparse_feature_columns + dense_feature_columns + if model_type == 'mmoe': + model = MMOE(dnn_features_columns, tower_dnn_hidden_units=[], task_types=['regression', 'binary'], + task_names=['duration', 'click']) + elif model_type == 'ple': + model = PLE(dnn_features_columns, shared_expert_num=1, specific_expert_num=1, + task_types=['regression', 'binary'], + num_levels=2, task_names=['duration', 'click']) + else: + model = None + print("Model type should be one of 'mmoe' and 'ple'.") + return model + + +def model_creator(config): + model = build_model(model_type=config['model_type'], sparse_features=config['column_info']['cat_cols'], + dense_features=config['column_info']['continuous_cols'], + feature_max_idx=config['column_info']['feature_max_idx']) + model.compile(optimizer='adam', + loss=["mean_squared_error", "binary_crossentropy"], + metrics=[['mae'], ["AUC", 'Precision', 'Recall']]) + return model + + +def label_cols(column_info): + return column_info["label"] + + +def feature_cols(column_info): + return column_info["cat_cols"] + column_info["embed_cols"] + column_info["continuous_cols"] + + +def train_multi_task(train_tbl_data, valid_tbl_data, save_path, model, cat_cols, continuous_cols, feature_max_idx): + column_info = { + "cat_cols": cat_cols, + "continuous_cols": continuous_cols, + "feature_max_idx": feature_max_idx, + "embed_cols": [], + "embed_in_dims": [], + "embed_out_dims": [], + "label": ['duration', 'click']} + + config = { + "column_info": column_info, + "inter_op_parallelism": 4, + "intra_op_parallelism": 8, + "model_type": model # mmoe or ple + } + + batch_size = 256 + estimator = Estimator.from_keras( + model_creator=model_creator, + verbose=False, + config=config) + + train_count = train_tbl_data.size() + print("Total number of train records: {}".format(train_count)) + total_steps = math.ceil(train_count / batch_size) + steps_per_epoch = 50 + # To train the full dataset for an entire epoch + epochs = math.ceil(total_steps / steps_per_epoch) + val_count = valid_tbl_data.size() + print("Total number of val records: {}".format(val_count)) + val_steps = math.ceil(val_count / batch_size) + callbacks = [EarlyStopping(monitor='val_duration_mae', mode='min', verbose=1, patience=3), + EarlyStopping(monitor='val_click_auc', mode='max', verbose=1, patience=3)] + + start = time() + estimator.fit(data=train_tbl_data.df, + epochs=epochs, + batch_size=batch_size, + steps_per_epoch=steps_per_epoch, + validation_data=valid_tbl_data.df, + validation_steps=val_steps, + callbacks=callbacks, + feature_cols=feature_cols(column_info), + label_cols=label_cols(column_info)) + end = time() + print("Training time is: ", end - start) + estimator.save(save_path) + print('Save model to path: ', save_path) + + +def test_multi_task(valid_tbl_data, save_path, model, cat_cols, continuous_cols, feature_max_idx): + column_info = { + "cat_cols": cat_cols, + "continuous_cols": continuous_cols, + "feature_max_idx": feature_max_idx, + "embed_cols": [], + "embed_in_dims": [], + "embed_out_dims": [], + "label": ['duration', 'click']} + config = { + "column_info": column_info, + "inter_op_parallelism": 4, + "intra_op_parallelism": 8, + "model_type": model # mmoe or ple + } + estimator = Estimator.from_keras( + model_creator=model_creator, + verbose=False, + config=config) + estimator.load(save_path) + + batch_size = 256 + val_steps = math.ceil(valid_tbl_data.size() / batch_size) + eval_results = estimator.evaluate(data=valid_tbl_data.df, + num_steps=val_steps, + batch_size=batch_size, + feature_cols=feature_cols(column_info), + label_cols=label_cols(column_info)) + for k, v in eval_results[0].items(): + print(k, v) + + +def _parse_args(): + parser = ArgumentParser(description="Set parameters for multi task demo") + parser.add_argument('--do_train', action='store_true', + help='Do training.') + parser.add_argument('--do_test', action='store_true', + help='Do training.') + parser.add_argument('--model_type', type=str, default="mmoe", + help='The multi task model, mmoe or ple.') + parser.add_argument('--train_data_path', type=str, + default='../dataset/news-dataset-parquet/train_processed_processed/', + help='The path for training dataset.') + parser.add_argument('--test_data_path', type=str, + default='../dataset/news-dataset-parquet/test_processed_processed/', + help='The path for testing dataset.') + parser.add_argument('--model_save_path', type=str, default='./recsys_multi_task/model_ple_10.bin', + help='The path for saving the trained model.') + parser.add_argument('--cluster_mode', type=str, default="local", + help='The cluster mode, such as local, yarn, standalone or spark-submit.') + parser.add_argument('--master', type=str, default=None, + help='The master url, only used when cluster mode is standalone.') + parser.add_argument('--executor_cores', type=int, default=8, + help='The executor core number.') + parser.add_argument('--executor_memory', type=str, default="24g", + help='The executor memory.') + parser.add_argument('--num_executors', type=int, default=4, + help='The number of executors.') + parser.add_argument('--driver_cores', type=int, default=2, + help='The driver core number.') + parser.add_argument('--driver_memory', type=str, default="24g", + help='The driver memory.') + args = parser.parse_args() + return args + + +if __name__ == "__main__": + args = _parse_args() + if args.cluster_mode == "local": # For local machine + sc = init_orca_context(cluster_mode="local", + cores=args.executor_cores, memory=args.executor_memory) + elif args.cluster_mode == "standalone": + sc = init_orca_context("standalone", master=args.master, + cores=args.executor_cores, num_nodes=args.num_executors, + memory=args.executor_memory, + driver_cores=args.driver_cores, + driver_memory=args.driver_memory) + elif args.cluster_mode == "yarn": # For Hadoop/YARN cluster + conf = {"spark.executor.memoryOverhead": "130g", + "spark.network.timeout": "10000000", + "spark.sql.broadcastTimeout": "7200", + "spark.sql.shuffle.partitions": "2000", + "spark.locality.wait": "0s", + "spark.sql.crossJoin.enabled": "true", + "spark.task.cpus": "1", + "spark.executor.heartbeatInterval": "200s", + "spark.driver.maxResultSize": "40G", + "spark.eventLog.enabled": "true", + "spark.app.name": "recsys-demo-train"} + sc = init_orca_context(cluster_mode="yarn", cores=args.executor_cores, + num_nodes=args.num_executors, memory=args.executor_memory, + driver_cores=args.driver_cores, driver_memory=args.driver_memory, + conf=conf, object_store_memory="80g", + env={"KMP_BLOCKTIME": "1", + "KMP_AFFINITY": "granularity=fine,compact,1,0", + "OMP_NUM_THREADS": "28"}) + elif args.cluster_mode == "spark-submit": + sc = init_orca_context("spark-submit") + else: + ArgumentError(False, + "cluster_mode should be one of 'local', 'yarn', 'standalone' and" + " 'spark-submit', but got " + args.cluster_mode) + cat_cols_ = [ + 'user_id', + 'article_id', + 'net_status', + 'exop_position', + 'device', + 'city', + 'age', + 'gender', + 'cat_1', + ] + continuous_cols_ = ['img_num'] + feature_max_idx_ = {'user_id': 40000, 'article_id': 200000, 'net_status': 1004, + 'exop_position': 2000, 'device': 2000, + 'city': 1379, 'age': 1005, 'gender': 1003, 'cat_1': 1038} + + if args.do_train: + train_tbl = FeatureTable.read_parquet(args.train_data_path) + valid_tbl = FeatureTable.read_parquet(args.test_data_path) + train_multi_task(train_tbl, valid_tbl, args.model_save_path, + args.model_type, cat_cols_, continuous_cols_, + feature_max_idx_) + elif args.do_test: + valid_tbl = FeatureTable.read_parquet(args.test_data_path) + test_multi_task(valid_tbl, args.model_save_path, args.model_type, + cat_cols_, continuous_cols_, feature_max_idx_) + else: + print("Need to choose whether to do_train or do_test.") + stop_orca_context() From 1d3bbca3eb13ba0ee95d0f3c556aa7d1eebf350e Mon Sep 17 00:00:00 2001 From: WangBin <32730386+devWangBin@users.noreply.github.com> Date: Thu, 18 Aug 2022 17:19:32 +0800 Subject: [PATCH 2/8] fix for Friesian code style --- python/friesian/example/multi_task/README.md | 18 +++++++++--------- .../example/multi_task/data_processing.py | 6 +++--- .../example/multi_task/run_multi_task.py | 16 ++++++++++------ 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/python/friesian/example/multi_task/README.md b/python/friesian/example/multi_task/README.md index 3cb5fc547ed..862ce67b6c5 100644 --- a/python/friesian/example/multi_task/README.md +++ b/python/friesian/example/multi_task/README.md @@ -20,15 +20,15 @@ In this example, a news dataset is used to demonstrate the training and testing Each row contains several feature values, timestamps and two labels. Using the timestamp to divide the training and testing sets. The click prediction (classification) and duration time prediction (regression) are two output targets. Original data examples are as follows: ```angular2html -+----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+----+------+------+-------------------+-------+-----+-------------+ -| user_id|article_id| expo_time|net_status|flush_nums|exop_position|click|duration|device| os|province|city| age|gender| ctime|img_num|cat_1| cat_2| -+----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+----+------+------+-------------------+-------+-----+-------------+ -|1000541010| 464467760|2021-06-30 09:57:14| 2| 0| 13| 1| 28|V2054A|Android| 上海|上海|A_0_24|female|2021-06-29 14:46:43| 3| 娱乐|娱乐/港台明星| -|1000541010| 463850913|2021-06-30 09:57:14| 2| 0| 15| 0| 0|V2054A|Android| 上海|上海|A_0_24|female|2021-06-27 22:29:13| 11| 时尚|时尚/女性时尚| -|1000541010| 464022440|2021-06-30 09:57:14| 2| 0| 17| 0| 0|V2054A|Android| 上海|上海|A_0_24|female|2021-06-28 12:22:54| 7| 农村|农村/农业资讯| -|1000541010| 464586545|2021-06-30 09:58:31| 2| 1| 20| 0| 0|V2054A|Android| 上海|上海|A_0_24|female|2021-06-29 13:25:06| 5| 娱乐|娱乐/港台明星| -|1000541010| 465352885|2021-07-03 18:13:03| 5| 0| 18| 0| 0|V2054A|Android| 上海|上海|A_0_24|female|2021-07-02 10:43:51| 18| 娱乐|娱乐/港台明星| -+----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+----+------+------+-------------------+-------+-----+-------------+ ++----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+--------+------+------+-------------------+-------+-------------+--------------------+ +| user_id|article_id| expo_time|net_status|flush_nums|exop_position|click|duration|device| os|province| city| age|gender| ctime|img_num| cat_1| cat_2| ++----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+--------+------+------+-------------------+-------+-------------+--------------------+ +|1000541010| 464467760|2021-06-30 09:57:14| 2| 0| 13| 1| 28|V2054A|Android|Shanghai|Shanghai|A_0_24|female|2021-06-29 14:46:43| 3|Entertainment| Entertainment/Stars| +|1000541010| 463850913|2021-06-30 09:57:14| 2| 0| 15| 0| 0|V2054A|Android|Shanghai|Shanghai|A_0_24|female|2021-06-27 22:29:13| 11| Fashions|Fashions/Female F...| +|1000541010| 464022440|2021-06-30 09:57:14| 2| 0| 17| 0| 0|V2054A|Android|Shanghai|Shanghai|A_0_24|female|2021-06-28 12:22:54| 7| Rural|Rural/Agriculture...| +|1000541010| 464586545|2021-06-30 09:58:31| 2| 1| 20| 0| 0|V2054A|Android|Shanghai|Shanghai|A_0_24|female|2021-06-29 13:25:06| 5|Entertainment| Entertainment/Stars| +|1000541010| 465352885|2021-07-03 18:13:03| 5| 0| 18| 0| 0|V2054A|Android|Shanghai|Shanghai|A_0_24|female|2021-07-02 10:43:51| 18|Entertainment| Entertainment/Stars| ++----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+--------+------+------+-------------------+-------+-------------+--------------------+ ``` With the built-in high-level preprocessing operations in FeatureTable, we can easily perform distributed pre-processing for large-scale data. diff --git a/python/friesian/example/multi_task/data_processing.py b/python/friesian/example/multi_task/data_processing.py index 6d7540d78ee..dedf3b3db03 100644 --- a/python/friesian/example/multi_task/data_processing.py +++ b/python/friesian/example/multi_task/data_processing.py @@ -148,9 +148,9 @@ def _parse_args(): df_train, df_test = read_and_split(args.input_path, sparse_int_features_, sparse_string_features_, dense_features_) train_tbl_, valid_tbl_ = feature_engineering(df_train, df_test, - model_path_, model_path_json_, - sparse_int_features_, - sparse_string_features_, dense_features_) + model_path_, model_path_json_, + sparse_int_features_, + sparse_string_features_, dense_features_) print(train_tbl_.size()) print(valid_tbl_.size()) train_tbl_.write_parquet(os.path.join(args.output_path, 'train_processed')) diff --git a/python/friesian/example/multi_task/run_multi_task.py b/python/friesian/example/multi_task/run_multi_task.py index 3a6c8022986..89d6d74d889 100644 --- a/python/friesian/example/multi_task/run_multi_task.py +++ b/python/friesian/example/multi_task/run_multi_task.py @@ -32,7 +32,8 @@ def build_model(model_type, sparse_features, dense_features, feature_max_idx): dense_feature_columns = [DenseFeat(feat, 1) for feat in dense_features] dnn_features_columns = sparse_feature_columns + dense_feature_columns if model_type == 'mmoe': - model = MMOE(dnn_features_columns, tower_dnn_hidden_units=[], task_types=['regression', 'binary'], + model = MMOE(dnn_features_columns, tower_dnn_hidden_units=[], + task_types=['regression', 'binary'], task_names=['duration', 'click']) elif model_type == 'ple': model = PLE(dnn_features_columns, shared_expert_num=1, specific_expert_num=1, @@ -45,7 +46,8 @@ def build_model(model_type, sparse_features, dense_features, feature_max_idx): def model_creator(config): - model = build_model(model_type=config['model_type'], sparse_features=config['column_info']['cat_cols'], + model = build_model(model_type=config['model_type'], + sparse_features=config['column_info']['cat_cols'], dense_features=config['column_info']['continuous_cols'], feature_max_idx=config['column_info']['feature_max_idx']) model.compile(optimizer='adam', @@ -62,7 +64,8 @@ def feature_cols(column_info): return column_info["cat_cols"] + column_info["embed_cols"] + column_info["continuous_cols"] -def train_multi_task(train_tbl_data, valid_tbl_data, save_path, model, cat_cols, continuous_cols, feature_max_idx): +def train_multi_task(train_tbl_data, valid_tbl_data, save_path, model, + cat_cols, continuous_cols, feature_max_idx): column_info = { "cat_cols": cat_cols, "continuous_cols": continuous_cols, @@ -154,12 +157,13 @@ def _parse_args(): parser.add_argument('--model_type', type=str, default="mmoe", help='The multi task model, mmoe or ple.') parser.add_argument('--train_data_path', type=str, - default='../dataset/news-dataset-parquet/train_processed_processed/', + default='path/to/training/dataset', help='The path for training dataset.') parser.add_argument('--test_data_path', type=str, - default='../dataset/news-dataset-parquet/test_processed_processed/', + default='path/to/testing/dataset', help='The path for testing dataset.') - parser.add_argument('--model_save_path', type=str, default='./recsys_multi_task/model_ple_10.bin', + parser.add_argument('--model_save_path', type=str, + default='path/to/save/the/trained/model', help='The path for saving the trained model.') parser.add_argument('--cluster_mode', type=str, default="local", help='The cluster mode, such as local, yarn, standalone or spark-submit.') From 49e53937c07b05cdfa5b92e72d251762b36e14b2 Mon Sep 17 00:00:00 2001 From: WangBin <32730386+devWangBin@users.noreply.github.com> Date: Tue, 30 Aug 2022 11:19:50 +0800 Subject: [PATCH 3/8] part 1: fix comments in pr --- python/friesian/example/multi_task/README.md | 56 +++++++++---------- .../example/multi_task/data_processing.py | 5 +- .../example/multi_task/run_multi_task.py | 4 +- 3 files changed, 33 insertions(+), 32 deletions(-) diff --git a/python/friesian/example/multi_task/README.md b/python/friesian/example/multi_task/README.md index 862ce67b6c5..c3ab6ca23c1 100644 --- a/python/friesian/example/multi_task/README.md +++ b/python/friesian/example/multi_task/README.md @@ -1,23 +1,23 @@ # Multi-task Recommendation with BigDL In addition to providing a personalized recommendation, recommendation systems need to output diverse predictions to meet the needs of real-world applications, such as user click-through rates and browsing (or watching) time predictions for products. -This example demonstrates how to use the [MMoE](https://dl.acm.org/doi/pdf/10.1145/3219819.3220007) or [PLE](https://dl.acm.org/doi/pdf/10.1145/3383313.3412236?casa_token=8fchWD8CHc0AAAAA:2cyP8EwkhIUlSFPRpfCGHahTddki0OEjDxfbUFMkXY5fU0FNtkvRzmYloJtLowFmL1en88FRFY4Q) model to implement multi-task recommendations with large-scale data. +This example demonstrates how to use BigDL Friesian to train [MMoE](https://dl.acm.org/doi/pdf/10.1145/3219819.3220007) or [PLE](https://dl.acm.org/doi/pdf/10.1145/3383313.3412236?casa_token=8fchWD8CHc0AAAAA:2cyP8EwkhIUlSFPRpfCGHahTddki0OEjDxfbUFMkXY5fU0FNtkvRzmYloJtLowFmL1en88FRFY4Q) for multi-task recommendation with large-scale data. -## Prepare environments +## Prepare the environment We highly recommend you use [Anaconda](https://www.anaconda.com/distribution/#linux) to prepare the environment, especially if you want to run on a yarn cluster. ``` conda create -n bigdl python=3.7 #bigdl is conda environment name, you can set another name you like. conda activate bigdl -pip install bigdl-orca[ray] -pip install bigdl-friesian +pip install bigdl-friesian[train] pip install tensorflow==2.9.1 pip install deepctr[cpu] ``` Refer to [this document](https://bigdl.readthedocs.io/en/latest/doc/UserGuide/python.html#install) for more installation guides. ## Data Preparation -In this example, a news dataset is used to demonstrate the training and testing process. -Each row contains several feature values, timestamps and two labels. Using the timestamp to divide the training and testing sets. +In this example, a [news dataset](https://github.com/zhongqiangwu960812/AI-RecommenderSystem/tree/master/Dataset) is used to demonstrate the training and testing process. +The original data has more than 1 million users, as well as more than 60 million clicks, and the processed training and test data have 2,977,923 and 962,066 records respectively. +Each row contains several feature values, timestamps and two labels. The timestamp is used to divide the training and testing sets. The click prediction (classification) and duration time prediction (regression) are two output targets. Original data examples are as follows: ```angular2html +----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+--------+------+------+-------------------+-------+-------------+--------------------+ @@ -31,8 +31,8 @@ The click prediction (classification) and duration time prediction (regression) +----------+----------+-------------------+----------+----------+-------------+-----+--------+------+-------+--------+--------+------+------+-------------------+-------+-------------+--------------------+ ``` -With the built-in high-level preprocessing operations in FeatureTable, we can easily perform distributed pre-processing for large-scale data. -The details of pre-processing can be found [here](https://github.com/intel-analytics/BigDL/blob/main/apps/wide-deep-recommendation/feature_engineering.ipynb). Examples of processed data are as follows: +With the built-in high-level preprocessing operations in Friesian FeatureTable, we can easily perform distributed preprocessing for large-scale data. +The details of preprocessing can be found [here](https://github.com/intel-analytics/BigDL/blob/main/apps/wide-deep-recommendation/feature_engineering.ipynb). Examples of processed data are as follows: ```angular2html +-------------------+-----+--------+-------------------+-----------+-----+-------+----------+----------+----------+-------------+------+---+--------+----+---+------+-----+ @@ -45,17 +45,17 @@ The details of pre-processing can be found [here](https://github.com/intel-analy |2021-07-03 18:13:03| 0| 0|2021-07-02 10:43:51| 0.09944751| 60| 14089| 81707| 2| 73| 313| 36| 2| 38| 308| 5| 1| 5| +-------------------+-----+--------+-------------------+-----------+-----+-------+----------+----------+----------+-------------+------+---+--------+----+---+------+-----+ ``` -Data pre-processing command: +Data preprocessing command: ```bash python data_processing.py \ - --input_path path/to/input/dataset \ - --output_path path/to/save/processed/dataset \ + --input_path /path/to/input/dataset \ + --output_path /path/to/save/processed/dataset \ --cluster_mode local \ --executor_cores 8 \ - --executor_memory 24g \ + --executor_memory 12g \ --num_executors 4 \ --driver_cores 2 \ - --driver_memory 24g + --driver_memory 8g ``` __Options for data_processing:__ @@ -64,29 +64,29 @@ __Options for data_processing:__ * `cluster_mode`: The cluster mode, such as local, yarn, standalone or spark-submit. Default to be local. * `master`: The master url, only used when cluster mode is standalone. Default to be None. * `executor_cores`: The executor core number. Default to be 8. -* `executor_memory`: The executor memory. Default to be 24g. +* `executor_memory`: The executor memory. Default to be 12g. * `num_executors`: The number of executors. Default to be 4. * `driver_cores`: The driver core number. Default to be 2. -* `driver_memory`: The driver memory. Default to be 24g. +* `driver_memory`: The driver memory. Default to be 8g. __NOTE:__ When the *cluster_mode* is yarn, *input_path* and *output_path* can be HDFS paths. ## Train and test Multi-task models -After data preprocessing, training MMoE or PlE model as follows: +After data preprocessing, the training command for MMoE or PLE model is as follows: ```bash python run_multi_task.py \ --do_train \ --model_type mmoe\ - --train_data_path path/to/training/dataset \ - --test_data_path path/to/testing/dataset \ - --model_save_path path/to/save/the/trained/model \ + --train_data_path /path/to/training/dataset \ + --test_data_path /path/to/testing/dataset \ + --model_save_path /path/to/save/the/trained/model \ --cluster_mode local \ --executor_cores 8 \ - --executor_memory 24g \ + --executor_memory 12g \ --num_executors 4 \ --driver_cores 2 \ - --driver_memory 24g + --driver_memory 8g ``` Evaluate Results as follows: @@ -94,17 +94,17 @@ Evaluate Results as follows: python run_multi_task.py \ --do_test \ --model_type mmoe\ - --test_data_path path/to/testing/dataset \ - --model_save_path path/to/save/the/trained/model \ + --test_data_path /path/to/testing/dataset \ + --model_save_path /path/to/save/the/trained/model \ --cluster_mode local \ --executor_cores 8 \ - --executor_memory 24g \ + --executor_memory 12g \ --num_executors 4 \ --driver_cores 2 \ - --driver_memory 24g + --driver_memory 8g ``` -__Options for data_processing:__ +__Options for training and test:__ * `do_train`: To start training model. * `do_test`: To start test model. * `model_type`: The multi task model, mmoe or ple. Default to be mmoe. @@ -114,10 +114,10 @@ __Options for data_processing:__ * `cluster_mode`: The cluster mode, such as local, yarn, standalone or spark-submit. Default to be local. * `master`: The master url, only used when cluster mode is standalone. Default to be None. * `executor_cores`: The executor core number. Default to be 8. -* `executor_memory`: The executor memory. Default to be 24g. +* `executor_memory`: The executor memory. Default to be 12g. * `num_executors`: The number of executors. Default to be 4. * `driver_cores`: The driver core number. Default to be 2. -* `driver_memory`: The driver memory. Default to be 24g. +* `driver_memory`: The driver memory. Default to be 8g. __NOTE:__ When the *cluster_mode* is yarn, *train_data_path*, *test_data_path* ans *model_save_path* can be HDFS paths. diff --git a/python/friesian/example/multi_task/data_processing.py b/python/friesian/example/multi_task/data_processing.py index dedf3b3db03..d06279386de 100644 --- a/python/friesian/example/multi_task/data_processing.py +++ b/python/friesian/example/multi_task/data_processing.py @@ -16,6 +16,7 @@ import argparse import os from argparse import ArgumentParser + from bigdl.friesian.feature import FeatureTable from bigdl.orca import init_orca_context, stop_orca_context @@ -95,13 +96,13 @@ def _parse_args(): help='The master url, only used when cluster mode is standalone.') parser.add_argument('--executor_cores', type=int, default=8, help='The executor core number.') - parser.add_argument('--executor_memory', type=str, default="24g", + parser.add_argument('--executor_memory', type=str, default="12g", help='The executor memory.') parser.add_argument('--num_executors', type=int, default=4, help='The number of executors.') parser.add_argument('--driver_cores', type=int, default=2, help='The driver core number.') - parser.add_argument('--driver_memory', type=str, default="24g", + parser.add_argument('--driver_memory', type=str, default="8g", help='The driver memory.') args_ = parser.parse_args() return args_ diff --git a/python/friesian/example/multi_task/run_multi_task.py b/python/friesian/example/multi_task/run_multi_task.py index 89d6d74d889..4750e7ca79d 100644 --- a/python/friesian/example/multi_task/run_multi_task.py +++ b/python/friesian/example/multi_task/run_multi_task.py @@ -171,13 +171,13 @@ def _parse_args(): help='The master url, only used when cluster mode is standalone.') parser.add_argument('--executor_cores', type=int, default=8, help='The executor core number.') - parser.add_argument('--executor_memory', type=str, default="24g", + parser.add_argument('--executor_memory', type=str, default="12g", help='The executor memory.') parser.add_argument('--num_executors', type=int, default=4, help='The number of executors.') parser.add_argument('--driver_cores', type=int, default=2, help='The driver core number.') - parser.add_argument('--driver_memory', type=str, default="24g", + parser.add_argument('--driver_memory', type=str, default="8g", help='The driver memory.') args = parser.parse_args() return args From 80614071d7ff58fe7a5ae0828bb261a6c460f791 Mon Sep 17 00:00:00 2001 From: WangBin <32730386+devWangBin@users.noreply.github.com> Date: Tue, 13 Sep 2022 18:31:55 +0800 Subject: [PATCH 4/8] update for comments part 2 --- python/friesian/example/multi_task/README.md | 9 ++- .../example/multi_task/data_processing.py | 66 +++++++++---------- .../example/multi_task/run_multi_task.py | 62 +++++++---------- 3 files changed, 61 insertions(+), 76 deletions(-) diff --git a/python/friesian/example/multi_task/README.md b/python/friesian/example/multi_task/README.md index c3ab6ca23c1..94d2a1fce00 100644 --- a/python/friesian/example/multi_task/README.md +++ b/python/friesian/example/multi_task/README.md @@ -53,6 +53,14 @@ python data_processing.py \ --cluster_mode local \ --executor_cores 8 \ --executor_memory 12g \ +``` +```bash +python data_processing.py \ + --input_path /path/to/input/dataset \ + --output_path /path/to/save/processed/dataset \ + --cluster_mode yarn \ + --executor_cores 8 \ + --executor_memory 12g \ --num_executors 4 \ --driver_cores 2 \ --driver_memory 8g @@ -76,7 +84,6 @@ When the *cluster_mode* is yarn, *input_path* and *output_path* can be HDFS path After data preprocessing, the training command for MMoE or PLE model is as follows: ```bash python run_multi_task.py \ - --do_train \ --model_type mmoe\ --train_data_path /path/to/training/dataset \ --test_data_path /path/to/testing/dataset \ diff --git a/python/friesian/example/multi_task/data_processing.py b/python/friesian/example/multi_task/data_processing.py index d06279386de..4a2a8794849 100644 --- a/python/friesian/example/multi_task/data_processing.py +++ b/python/friesian/example/multi_task/data_processing.py @@ -39,33 +39,33 @@ def read_and_split(data_input_path, sparse_int_features, sparse_string_features, 'exop_position', 'click', 'duration', 'device', 'os', 'province', 'city', 'age', 'gender', 'ctime', 'img_num', 'cat_1', 'cat_2' ] - if data_input_path.split('.')[-1] == 'csv': - data_pd = FeatureTable.read_csv(data_input_path, header=False, names=header_names) + if data_input_path.endswith("csv"): + tbl = FeatureTable.read_csv(data_input_path, header=False, names=header_names) else: - data_pd = FeatureTable.read_parquet(data_input_path) - data_pd = data_pd.cast(sparse_int_features, 'string') - data_pd = data_pd.cast(dense_features, 'string') + tbl = FeatureTable.read_parquet(data_input_path) + tbl = tbl.cast(sparse_int_features, 'string') + tbl = tbl.cast(dense_features, 'string') # fill absence data for feature in (sparse_int_features + sparse_string_features): - data_pd = data_pd.fillna("", feature) + tbl = tbl.fillna("", feature) for dense_feature in dense_features: - data_pd = data_pd.fillna('0.0', dense_feature) - print(data_pd.df.dtypes) + tbl = tbl.fillna('0.0', dense_feature) + print(tbl.df.dtypes) process_img_num = lambda x: transform(x) process_cat_2 = lambda x: transform_cat_2(x) - data_pd = data_pd.apply("img_num", "img_num", process_img_num, "float") - data_pd = data_pd.apply("cat_2", "cat_2", process_cat_2, "string") + tbl = tbl.apply("img_num", "img_num", process_img_num, "float") + tbl = tbl.apply("cat_2", "cat_2", process_cat_2, "string") - train_tbl = FeatureTable(data_pd.df[data_pd.df['expo_time'] < '2021-07-06']) - valid_tbl = FeatureTable(data_pd.df[data_pd.df['expo_time'] >= '2021-07-06']) + train_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] < '2021-07-06']) + valid_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] >= '2021-07-06']) print('train_data.shape: ', train_tbl.size()) print('test_data.shape: ', valid_tbl.size()) return train_tbl, valid_tbl -def feature_engineering(train_tbl, valid_tbl, model_path, model_path_json, sparse_int_features, +def feature_engineering(train_tbl, valid_tbl, output_path, sparse_int_features, sparse_string_features, dense_features): import json train_tbl, min_max_dict = train_tbl.min_max_scale(dense_features) @@ -76,14 +76,11 @@ def feature_engineering(train_tbl, valid_tbl, model_path, model_path_json, spars valid_tbl = valid_tbl.encode_string(feature, feature_idx) valid_tbl = valid_tbl.fillna(0, feature) print("The class number of feature: {}/{}".format(feature, feature_idx.size())) - feature_idx.write_parquet(model_path) - fea_dict = feature_idx.to_dict() - with open(model_path_json + "/" + feature + '.json', 'w', encoding='utf-8') as ff: - ff.write(json.dumps(fea_dict, ensure_ascii=False, indent=2)) + feature_idx.write_parquet(os.path.join(output_path, 'feature_maps')) return train_tbl, valid_tbl -def _parse_args(): +def parse_args(): parser = ArgumentParser(description="Transform dataset for multi task demo") parser.add_argument('--input_path', type=str, default='/path/to/input/dataset', @@ -109,7 +106,7 @@ def _parse_args(): if __name__ == '__main__': - args = _parse_args() + args = parse_args() if args.cluster_mode == "local": sc = init_orca_context("local", cores=args.executor_cores, memory=args.executor_memory) @@ -130,30 +127,27 @@ def _parse_args(): "cluster_mode should be one of 'local', 'yarn', 'standalone' and" " 'spark-submit', but got " + args.cluster_mode) - sparse_int_features_ = [ + sparse_int_features = [ 'user_id', 'article_id', 'net_status', 'flush_nums', 'exop_position', ] - sparse_string_features_ = [ + sparse_string_features = [ 'device', 'os', 'province', 'city', 'age', 'gender', 'cat_1', 'cat_2' ] - dense_features_ = ['img_num'] - model_path_ = os.path.join(args.output_path, 'feature_maps') - model_path_json_ = os.path.join(args.output_path, 'feature_maps_json') - os.makedirs(model_path_, exist_ok=True) - os.makedirs(model_path_json_, exist_ok=True) + dense_features = ['img_num'] + # read, reformat and split data - df_train, df_test = read_and_split(args.input_path, sparse_int_features_, - sparse_string_features_, dense_features_) - train_tbl_, valid_tbl_ = feature_engineering(df_train, df_test, - model_path_, model_path_json_, - sparse_int_features_, - sparse_string_features_, dense_features_) - print(train_tbl_.size()) - print(valid_tbl_.size()) - train_tbl_.write_parquet(os.path.join(args.output_path, 'train_processed')) - valid_tbl_.write_parquet(os.path.join(args.output_path, 'test_processed')) + df_train, df_test = read_and_split(args.input_path, sparse_int_features, + sparse_string_features, dense_features) + train_tbl, valid_tbl = feature_engineering(df_train, df_test, + args.output_path, + sparse_int_features, + sparse_string_features, dense_features) + print(train_tbl.size()) + print(valid_tbl.size()) + train_tbl.write_parquet(os.path.join(args.output_path, 'train_processed')) + valid_tbl.write_parquet(os.path.join(args.output_path, 'test_processed')) stop_orca_context() diff --git a/python/friesian/example/multi_task/run_multi_task.py b/python/friesian/example/multi_task/run_multi_task.py index 4750e7ca79d..7544330dc10 100644 --- a/python/friesian/example/multi_task/run_multi_task.py +++ b/python/friesian/example/multi_task/run_multi_task.py @@ -25,6 +25,9 @@ from deepctr.feature_column import SparseFeat, DenseFeat from deepctr.models import MMOE, PLE +# from python.serving.src.bigdl.serving.log4Error import invalidInputError +from python.dllib.src.bigdl.dllib.utils.log4Error import invalidInputError + def build_model(model_type, sparse_features, dense_features, feature_max_idx): sparse_feature_columns = [SparseFeat(feat, feature_max_idx[feat], @@ -40,8 +43,7 @@ def build_model(model_type, sparse_features, dense_features, feature_max_idx): task_types=['regression', 'binary'], num_levels=2, task_names=['duration', 'click']) else: - model = None - print("Model type should be one of 'mmoe' and 'ple'.") + raise invalidInputError return model @@ -150,10 +152,7 @@ def test_multi_task(valid_tbl_data, save_path, model, cat_cols, continuous_cols, def _parse_args(): parser = ArgumentParser(description="Set parameters for multi task demo") - parser.add_argument('--do_train', action='store_true', - help='Do training.') - parser.add_argument('--do_test', action='store_true', - help='Do training.') + parser.add_argument('--model_type', type=str, default="mmoe", help='The multi task model, mmoe or ple.') parser.add_argument('--train_data_path', type=str, @@ -195,31 +194,17 @@ def _parse_args(): driver_cores=args.driver_cores, driver_memory=args.driver_memory) elif args.cluster_mode == "yarn": # For Hadoop/YARN cluster - conf = {"spark.executor.memoryOverhead": "130g", - "spark.network.timeout": "10000000", - "spark.sql.broadcastTimeout": "7200", - "spark.sql.shuffle.partitions": "2000", - "spark.locality.wait": "0s", - "spark.sql.crossJoin.enabled": "true", - "spark.task.cpus": "1", - "spark.executor.heartbeatInterval": "200s", - "spark.driver.maxResultSize": "40G", - "spark.eventLog.enabled": "true", - "spark.app.name": "recsys-demo-train"} sc = init_orca_context(cluster_mode="yarn", cores=args.executor_cores, num_nodes=args.num_executors, memory=args.executor_memory, driver_cores=args.driver_cores, driver_memory=args.driver_memory, - conf=conf, object_store_memory="80g", - env={"KMP_BLOCKTIME": "1", - "KMP_AFFINITY": "granularity=fine,compact,1,0", - "OMP_NUM_THREADS": "28"}) + object_store_memory="80g") elif args.cluster_mode == "spark-submit": sc = init_orca_context("spark-submit") else: ArgumentError(False, "cluster_mode should be one of 'local', 'yarn', 'standalone' and" " 'spark-submit', but got " + args.cluster_mode) - cat_cols_ = [ + cat_cols = [ 'user_id', 'article_id', 'net_status', @@ -230,21 +215,20 @@ def _parse_args(): 'gender', 'cat_1', ] - continuous_cols_ = ['img_num'] - feature_max_idx_ = {'user_id': 40000, 'article_id': 200000, 'net_status': 1004, - 'exop_position': 2000, 'device': 2000, - 'city': 1379, 'age': 1005, 'gender': 1003, 'cat_1': 1038} - - if args.do_train: - train_tbl = FeatureTable.read_parquet(args.train_data_path) - valid_tbl = FeatureTable.read_parquet(args.test_data_path) - train_multi_task(train_tbl, valid_tbl, args.model_save_path, - args.model_type, cat_cols_, continuous_cols_, - feature_max_idx_) - elif args.do_test: - valid_tbl = FeatureTable.read_parquet(args.test_data_path) - test_multi_task(valid_tbl, args.model_save_path, args.model_type, - cat_cols_, continuous_cols_, feature_max_idx_) - else: - print("Need to choose whether to do_train or do_test.") + continuous_cols = ['img_num'] + feature_max_idx = {'user_id': 40000, 'article_id': 200000, 'net_status': 1004, + 'exop_position': 2000, 'device': 2000, + 'city': 1379, 'age': 1005, 'gender': 1003, 'cat_1': 1038} + + # do train + train_tbl = FeatureTable.read_parquet(args.train_data_path) + valid_tbl = FeatureTable.read_parquet(args.test_data_path) + train_multi_task(train_tbl, valid_tbl, args.model_save_path, + args.model_type, cat_cols, continuous_cols, + feature_max_idx) + # do test + # valid_tbl = FeatureTable.read_parquet(args.test_data_path) + test_multi_task(valid_tbl, args.model_save_path, args.model_type, + cat_cols, continuous_cols, feature_max_idx) + stop_orca_context() From c25f292e98bc073da4c33243c3800caa32843dea Mon Sep 17 00:00:00 2001 From: WangBin <32730386+devWangBin@users.noreply.github.com> Date: Thu, 15 Sep 2022 16:33:34 +0800 Subject: [PATCH 5/8] update for pr comments part 3 --- python/friesian/example/multi_task/README.md | 23 +++++++++++----- .../example/multi_task/data_processing.py | 26 +++++++++---------- .../example/multi_task/run_multi_task.py | 12 ++++----- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/python/friesian/example/multi_task/README.md b/python/friesian/example/multi_task/README.md index 94d2a1fce00..68fd1dbd210 100644 --- a/python/friesian/example/multi_task/README.md +++ b/python/friesian/example/multi_task/README.md @@ -78,7 +78,7 @@ __Options for data_processing:__ * `driver_memory`: The driver memory. Default to be 8g. __NOTE:__ -When the *cluster_mode* is yarn, *input_path* and *output_path* can be HDFS paths. +When the *cluster_mode* is yarn, *input_path* and *output_path* should be HDFS paths. ## Train and test Multi-task models After data preprocessing, the training command for MMoE or PLE model is as follows: @@ -91,15 +91,23 @@ python run_multi_task.py \ --cluster_mode local \ --executor_cores 8 \ --executor_memory 12g \ +``` +```bash +python run_multi_task.py \ + --model_type mmoe\ + --train_data_path /path/to/training/dataset \ + --test_data_path /path/to/testing/dataset \ + --model_save_path /path/to/save/the/trained/model \ + --cluster_mode yarn \ + --executor_cores 8 \ + --executor_memory 12g \ --num_executors 4 \ --driver_cores 2 \ --driver_memory 8g ``` - Evaluate Results as follows: ```bash python run_multi_task.py \ - --do_test \ --model_type mmoe\ --test_data_path /path/to/testing/dataset \ --model_save_path /path/to/save/the/trained/model \ @@ -109,11 +117,14 @@ python run_multi_task.py \ --num_executors 4 \ --driver_cores 2 \ --driver_memory 8g +``` +Results: +```angular2html + + ``` __Options for training and test:__ -* `do_train`: To start training model. -* `do_test`: To start test model. * `model_type`: The multi task model, mmoe or ple. Default to be mmoe. * `train_data_path`: The path to training dataset. * `test_data_path`: The path to testing dataset. @@ -127,4 +138,4 @@ __Options for training and test:__ * `driver_memory`: The driver memory. Default to be 8g. __NOTE:__ -When the *cluster_mode* is yarn, *train_data_path*, *test_data_path* ans *model_save_path* can be HDFS paths. +When the *cluster_mode* is yarn, *train_data_path*, *test_data_path* ans *model_save_path* should be HDFS paths. diff --git a/python/friesian/example/multi_task/data_processing.py b/python/friesian/example/multi_task/data_processing.py index 4a2a8794849..50a15249488 100644 --- a/python/friesian/example/multi_task/data_processing.py +++ b/python/friesian/example/multi_task/data_processing.py @@ -19,9 +19,11 @@ from bigdl.friesian.feature import FeatureTable from bigdl.orca import init_orca_context, stop_orca_context +from bigdl.dllib.utils.log4Error import invalidInputError def transform(x): + # dealing with some abnormal data if x == '上海': return 0.0 elif isinstance(x, float): @@ -60,8 +62,8 @@ def read_and_split(data_input_path, sparse_int_features, sparse_string_features, train_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] < '2021-07-06']) valid_tbl = FeatureTable(tbl.df[tbl.df['expo_time'] >= '2021-07-06']) - print('train_data.shape: ', train_tbl.size()) - print('test_data.shape: ', valid_tbl.size()) + print('The number of train data: ', train_tbl.size()) + print('The number of test data: ', valid_tbl.size()) return train_tbl, valid_tbl @@ -70,7 +72,7 @@ def feature_engineering(train_tbl, valid_tbl, output_path, sparse_int_features, import json train_tbl, min_max_dict = train_tbl.min_max_scale(dense_features) valid_tbl = valid_tbl.transform_min_max_scale(dense_features, min_max_dict) - cat_cols = sparse_string_features[-1:] + sparse_int_features + sparse_string_features[:-1] + cat_cols = sparse_int_features + sparse_string_features for feature in cat_cols: train_tbl, feature_idx = train_tbl.category_encode(feature) valid_tbl = valid_tbl.encode_string(feature, feature_idx) @@ -123,9 +125,9 @@ def parse_args(): elif args.cluster_mode == "spark-submit": sc = init_orca_context("spark-submit") else: - argparse.ArgumentError(False, - "cluster_mode should be one of 'local', 'yarn', 'standalone' and" - " 'spark-submit', but got " + args.cluster_mode) + invalidInputError(False, + "cluster_mode should be one of 'local', 'yarn', 'standalone' and" + " 'spark-submit', but got " + args.cluster_mode) sparse_int_features = [ 'user_id', 'article_id', @@ -133,9 +135,9 @@ def parse_args(): 'exop_position', ] sparse_string_features = [ - 'device', 'os', 'province', + 'cat_2', 'device', 'os', 'province', 'city', 'age', - 'gender', 'cat_1', 'cat_2' + 'gender', 'cat_1' ] dense_features = ['img_num'] @@ -143,11 +145,9 @@ def parse_args(): df_train, df_test = read_and_split(args.input_path, sparse_int_features, sparse_string_features, dense_features) train_tbl, valid_tbl = feature_engineering(df_train, df_test, - args.output_path, - sparse_int_features, - sparse_string_features, dense_features) - print(train_tbl.size()) - print(valid_tbl.size()) + args.output_path, + sparse_int_features, + sparse_string_features, dense_features) train_tbl.write_parquet(os.path.join(args.output_path, 'train_processed')) valid_tbl.write_parquet(os.path.join(args.output_path, 'test_processed')) stop_orca_context() diff --git a/python/friesian/example/multi_task/run_multi_task.py b/python/friesian/example/multi_task/run_multi_task.py index 7544330dc10..e9d1619ea92 100644 --- a/python/friesian/example/multi_task/run_multi_task.py +++ b/python/friesian/example/multi_task/run_multi_task.py @@ -25,8 +25,7 @@ from deepctr.feature_column import SparseFeat, DenseFeat from deepctr.models import MMOE, PLE -# from python.serving.src.bigdl.serving.log4Error import invalidInputError -from python.dllib.src.bigdl.dllib.utils.log4Error import invalidInputError +from bigdl.dllib.utils.log4Error import invalidInputError def build_model(model_type, sparse_features, dense_features, feature_max_idx): @@ -43,7 +42,7 @@ def build_model(model_type, sparse_features, dense_features, feature_max_idx): task_types=['regression', 'binary'], num_levels=2, task_names=['duration', 'click']) else: - raise invalidInputError + invalidInputError(False, 'model_type should be one of "mmoe" and "ple", but got ' + model_type) return model @@ -201,9 +200,9 @@ def _parse_args(): elif args.cluster_mode == "spark-submit": sc = init_orca_context("spark-submit") else: - ArgumentError(False, - "cluster_mode should be one of 'local', 'yarn', 'standalone' and" - " 'spark-submit', but got " + args.cluster_mode) + invalidInputError(False, + "cluster_mode should be one of 'local', 'yarn', 'standalone' and" + " 'spark-submit', but got " + args.cluster_mode) cat_cols = [ 'user_id', 'article_id', @@ -227,7 +226,6 @@ def _parse_args(): args.model_type, cat_cols, continuous_cols, feature_max_idx) # do test - # valid_tbl = FeatureTable.read_parquet(args.test_data_path) test_multi_task(valid_tbl, args.model_save_path, args.model_type, cat_cols, continuous_cols, feature_max_idx) From b9f706da553362e53389627ee7c8117503e02493 Mon Sep 17 00:00:00 2001 From: WangBin <32730386+devWangBin@users.noreply.github.com> Date: Thu, 15 Sep 2022 16:38:00 +0800 Subject: [PATCH 6/8] update for code style --- .gitignore | 1 + python/friesian/example/multi_task/run_multi_task.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 8be49ffa06d..ef42fd34a68 100644 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,4 @@ __pycache__ target build dist +apps/wide-deep-recommendation/model_training.ipynb diff --git a/python/friesian/example/multi_task/run_multi_task.py b/python/friesian/example/multi_task/run_multi_task.py index e9d1619ea92..c0577c7abcd 100644 --- a/python/friesian/example/multi_task/run_multi_task.py +++ b/python/friesian/example/multi_task/run_multi_task.py @@ -42,7 +42,8 @@ def build_model(model_type, sparse_features, dense_features, feature_max_idx): task_types=['regression', 'binary'], num_levels=2, task_names=['duration', 'click']) else: - invalidInputError(False, 'model_type should be one of "mmoe" and "ple", but got ' + model_type) + invalidInputError(False, 'model_type should be one of "mmoe" and "ple", ' + 'but got ' + model_type) return model From a1de4359111a28deb6b09f86e51f78c98314aa1f Mon Sep 17 00:00:00 2001 From: WangBin <32730386+devWangBin@users.noreply.github.com> Date: Thu, 15 Sep 2022 19:31:20 +0800 Subject: [PATCH 7/8] update codes and add test results --- python/friesian/example/multi_task/README.md | 13 ++++++++++++- .../example/multi_task/data_processing.py | 18 ++++++++++++------ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/python/friesian/example/multi_task/README.md b/python/friesian/example/multi_task/README.md index 68fd1dbd210..09c5c34d731 100644 --- a/python/friesian/example/multi_task/README.md +++ b/python/friesian/example/multi_task/README.md @@ -120,8 +120,19 @@ python run_multi_task.py \ ``` Results: ```angular2html +1. For mmoe: (Epoch 7: early stopping) +validation_loss 6546.69970703125 +validation_duration_loss 6546.2734375 +validation_click_loss 0.42016342282295227 +validation_duration_mae 39.18841552734375 +validation_click_auc 0.648556113243103 - +2. For ple: (Epoch 4: early stopping) +validation_loss 6610.6552734375 +validation_duration_loss 6610.244140625 +validation_click_loss 0.4236340820789337 +validation_duration_mae 42.66642379760742 +validation_click_auc 0.6481693387031555 ``` __Options for training and test:__ diff --git a/python/friesian/example/multi_task/data_processing.py b/python/friesian/example/multi_task/data_processing.py index 50a15249488..c8f27f5b1e5 100644 --- a/python/friesian/example/multi_task/data_processing.py +++ b/python/friesian/example/multi_task/data_processing.py @@ -42,18 +42,22 @@ def read_and_split(data_input_path, sparse_int_features, sparse_string_features, 'age', 'gender', 'ctime', 'img_num', 'cat_1', 'cat_2' ] if data_input_path.endswith("csv"): + # data_pd = pd.read_csv(os.path.join(data_input_path, 'train_data.csv'), index_col=0, + # parse_dates=['expo_time'], low_memory=False) + # data_pd.to_csv('../train_data_new.csv', index=False, header=None) tbl = FeatureTable.read_csv(data_input_path, header=False, names=header_names) else: tbl = FeatureTable.read_parquet(data_input_path) + + print('The number of total data: ', tbl.size()) + tbl = tbl.cast(sparse_int_features, 'string') tbl = tbl.cast(dense_features, 'string') # fill absence data for feature in (sparse_int_features + sparse_string_features): tbl = tbl.fillna("", feature) - for dense_feature in dense_features: - tbl = tbl.fillna('0.0', dense_feature) - print(tbl.df.dtypes) + tbl = tbl.fillna('0.0', 'img_num') process_img_num = lambda x: transform(x) process_cat_2 = lambda x: transform_cat_2(x) @@ -72,7 +76,7 @@ def feature_engineering(train_tbl, valid_tbl, output_path, sparse_int_features, import json train_tbl, min_max_dict = train_tbl.min_max_scale(dense_features) valid_tbl = valid_tbl.transform_min_max_scale(dense_features, min_max_dict) - cat_cols = sparse_int_features + sparse_string_features + cat_cols = sparse_string_features[-1:] + sparse_int_features + sparse_string_features[:-1] for feature in cat_cols: train_tbl, feature_idx = train_tbl.category_encode(feature) valid_tbl = valid_tbl.encode_string(feature, feature_idx) @@ -134,10 +138,12 @@ def parse_args(): 'net_status', 'flush_nums', 'exop_position', ] + # put cat_2 at first bug + # put cat_1,cat_2 at first bug sparse_string_features = [ - 'cat_2', 'device', 'os', 'province', + 'device', 'os', 'province', 'city', 'age', - 'gender', 'cat_1' + 'gender', 'cat_1', 'cat_2' ] dense_features = ['img_num'] From 8dc1a1adf2de84e58c91afa8eafd2e499bd620f0 Mon Sep 17 00:00:00 2001 From: WangBin <32730386+devWangBin@users.noreply.github.com> Date: Thu, 15 Sep 2022 19:40:44 +0800 Subject: [PATCH 8/8] Update README.md --- python/friesian/example/multi_task/README.md | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/python/friesian/example/multi_task/README.md b/python/friesian/example/multi_task/README.md index 09c5c34d731..3f65b13d5e4 100644 --- a/python/friesian/example/multi_task/README.md +++ b/python/friesian/example/multi_task/README.md @@ -119,15 +119,23 @@ python run_multi_task.py \ --driver_memory 8g ``` Results: -```angular2html -1. For mmoe: (Epoch 7: early stopping) +```angular2html +1. For MMoE: +50/50 [==============================] - 85s 2s/step - loss: 5505.2607 - duration_loss: 5504.8799 - click_loss: 0.3727 - duration_mae: 30.[1520/1979] k_auc: 0.6574 - click_precision: 0.0000e+00 - click_recall: 0.0000e+00 - val_loss: 6546.5293 - val_duration_loss: 6546.0991 - val_click_loss: 0.4202 - val_duration_mae: 39.1881 - val_click_auc: 0.6486 - val_click_precision: 0.4036 - val_click_recall: 0.0012 +(Worker pid=22945) Epoch 7: early stopping +Save model to path: ./save_model/mmoe_model.bin +3759/3759 [==============================] - 78s 20ms/step - loss: 6546.6997 - duration_loss: 6546.2734 - click_loss: 0.4202 - duration_mae: 39.1884 - click_auc: 0.6486 - click_precision: 0.4036 - click_recall: 0.0012 validation_loss 6546.69970703125 validation_duration_loss 6546.2734375 validation_click_loss 0.42016342282295227 validation_duration_mae 39.18841552734375 validation_click_auc 0.648556113243103 -2. For ple: (Epoch 4: early stopping) +2. For PLE: +50/50 [==============================] - 87s 2s/step - loss: 6788.6426 - duration_loss: 6788.2168 - click_loss: 0.4217 - duration_mae: 38.3158 - click_auc: 0.6523 - click_precision: 0.3333 - click_recall: 9.7752e-04 - val_loss: 6610.4990 - val_duration_loss: 6610.0732 - val_click_loss: 0.4236 - val_duration_mae: 42.6656 - val_click_auc: 0.6482 - val_click_precision: 0.6667 - val_click_recall: 9.7058e-05 +(Worker pid=13791) Epoch 4: early stopping +Save model to path: ./save_model/ple_model.bin +3753/3759 [============================>.] - ETA: 0s - loss: 6612.4531 - duration_loss: 6612.0410 - click_loss: 0.4236 - duration_mae: 42.6693 - click_auc: 0.6482 - click_precision: 0.6667 - click_recall: 9.7249e-05 validation_loss 6610.6552734375 validation_duration_loss 6610.244140625 validation_click_loss 0.4236340820789337