diff --git a/federatedscope/core/auxiliaries/data_builder.py b/federatedscope/core/auxiliaries/data_builder.py index a6e3c13d4..d14c7f238 100644 --- a/federatedscope/core/auxiliaries/data_builder.py +++ b/federatedscope/core/auxiliaries/data_builder.py @@ -567,7 +567,8 @@ def get_data(config): elif config.data.type.lower() == 'vertical_fl_data': from federatedscope.vertical_fl.dataloader import load_vertical_data data, modified_config = load_vertical_data(config, generate=True) - elif 'movielens' in config.data.type.lower(): + elif 'movielens' in config.data.type.lower( + ) or 'netflix' in config.data.type.lower(): from federatedscope.mf.dataloader import load_mf_dataset data, modified_config = load_mf_dataset(config) elif '@' in config.data.type.lower(): diff --git a/federatedscope/mf/baseline/hfl_fedavg_standalone_on_netflix.yaml b/federatedscope/mf/baseline/hfl_fedavg_standalone_on_netflix.yaml new file mode 100644 index 000000000..37d645380 --- /dev/null +++ b/federatedscope/mf/baseline/hfl_fedavg_standalone_on_netflix.yaml @@ -0,0 +1,29 @@ +use_gpu: False +early_stop: + patience: 100 +federate: + mode: standalone + total_round_num: 100 + client_num: 480189 + online_aggr: True + share_local_model: True + sample_client_rate: 0.0001 +data: + root: data/ + type: HFLNetflix + batch_size: 32 + num_workers: 0 +model: + type: HMFNet + hidden: 10 +train: + local_update_steps: 50 + optimizer: + lr: 1. +criterion: + type: MSELoss +trainer: + type: mftrainer +eval: + freq: 100 + metrics: [] \ No newline at end of file diff --git a/federatedscope/mf/dataloader/dataloader.py b/federatedscope/mf/dataloader/dataloader.py index b8b2a47c4..c65f18603 100644 --- a/federatedscope/mf/dataloader/dataloader.py +++ b/federatedscope/mf/dataloader/dataloader.py @@ -11,7 +11,9 @@ "vflmovielens1m": "VFLMovieLens1M", "vflmovielens10m": "VFLMovieLens10M", "hflmovielens1m": "HFLMovieLens1M", - "hflmovielens10m": "HFLMovieLens10M" + "hflmovielens10m": "HFLMovieLens10M", + 'vflnetflix': "VFLNetflix", + 'hflnetflix': "HFLNetflix" } @@ -30,13 +32,16 @@ def load_mf_dataset(config=None): """ if config.data.type.lower() in MFDATA_CLASS_DICT: # Dataset - dataset = getattr( - importlib.import_module("federatedscope.mf.dataset.movielens"), - MFDATA_CLASS_DICT[config.data.type.lower()])( - root=config.data.root, - num_client=config.federate.client_num, - train_portion=config.data.splits[0], - download=True) + if config.data.type.lower() in ['vflnetflix', 'hflnetflix']: + mpath = "federatedscope.mf.dataset.netflix" + else: + mpath = "federatedscope.mf.dataset.movielens" + dataset = getattr(importlib.import_module(mpath), + MFDATA_CLASS_DICT[config.data.type.lower()])( + root=config.data.root, + num_client=config.federate.client_num, + train_portion=config.data.splits[0], + download=True) else: raise NotImplementedError("Dataset {} is not implemented.".format( config.data.type)) diff --git a/federatedscope/mf/dataset/movielens.py b/federatedscope/mf/dataset/movielens.py index 23e3f347d..b5dba10cc 100644 --- a/federatedscope/mf/dataset/movielens.py +++ b/federatedscope/mf/dataset/movielens.py @@ -77,19 +77,6 @@ def __init__(self, root, num_client, train_portion=0.9, download=True): ratings = self._load_meta() self._split_n_clients_rating(ratings, num_client, 1 - train_portion) - def _split_n_clients_rating(self, ratings: csc_matrix, num_client: int, - test_portion: float): - id_item = np.arange(self.n_item) - shuffle(id_item) - items_per_client = np.array_split(id_item, num_client) - data = dict() - for clientId, items in enumerate(items_per_client): - client_ratings = ratings[:, items] - train_ratings, test_ratings = self._split_train_test_ratings( - client_ratings, test_portion) - data[clientId + 1] = {"train": train_ratings, "test": test_ratings} - self.data = data - def _split_train_test_ratings(self, ratings: csc_matrix, test_portion: float): n_ratings = ratings.count_nonzero() @@ -109,22 +96,26 @@ def _split_train_test_ratings(self, ratings: csc_matrix, train_ratings, test_ratings = train.tocsc(), test.tocsc() return train_ratings, test_ratings + def _read_raw(self): + fpath = os.path.join(self.root, self.base_folder, self.filename, + self.raw_file) + data = pd.read_csv(fpath, + sep="::", + engine="python", + usecols=[0, 1, 2], + names=["userId", "movieId", "rating"], + dtype={ + "userId": np.int32, + "movieId": np.int32, + "rating": np.float32 + }) + return data + def _load_meta(self): meta_path = os.path.join(self.root, self.base_folder, "ratings.pkl") if not os.path.exists(meta_path): logger.info("Processing data into {} parties.") - fpath = os.path.join(self.root, self.base_folder, self.filename, - self.raw_file) - data = pd.read_csv(fpath, - sep="::", - engine="python", - usecols=[0, 1, 2], - names=["userId", "movieId", "rating"], - dtype={ - "userId": np.int32, - "movieId": np.int32, - "rating": np.float32 - }) + data = self._read_raw() # Map idx unique_id_item, unique_id_user = np.sort( data["movieId"].unique()), np.sort(data["userId"].unique()) diff --git a/federatedscope/mf/dataset/netflix.py b/federatedscope/mf/dataset/netflix.py new file mode 100644 index 000000000..d44249892 --- /dev/null +++ b/federatedscope/mf/dataset/netflix.py @@ -0,0 +1,85 @@ +import os +import tarfile +import logging + +import pandas as pd +import numpy as np + +from federatedscope.mf.dataset import MovieLensData, HMFDataset, VMFDataset + +logger = logging.getLogger(__name__) + + +class Netflix(MovieLensData): + """Netflix Prize Dataset + (https://archive.org/download/nf_prize_dataset.tar/nf_prize_dataset.tar.gz) + + Netflix Prize consists of approximately 100,000,000 ratings from + 480,189 users for 17,770 movies. Each rating in the training dataset + consists of four entries: user, movie, rating date, and rating. + Users and movies are represented by integer IDs, while ratings range + from 1 to 5. + """ + base_folder = 'Netflix' + url = 'https://archive.org/download/nf_prize_dataset.tar' \ + '/nf_prize_dataset.tar.gz' + filename = 'download' + zip_md5 = 'a8f23d2d76461211c6b4c0ca6df2547d' + raw_file = 'training_set.tar' + raw_file_md5 = '0098ee8997ffda361a59bc0dd1bdad8b' + mv_names = [f'mv_{str(x).rjust(7, "0")}.txt' for x in range(1, 17771)] + + def _extract_raw_file(self, dir_path): + # Extract flag + flag = False + if not os.path.exists(dir_path): + flag = True + else: + for name in self.mv_names: + if not os.path.exists(os.path.join(dir_path, name)): + flag = True + break + if flag: + tar = tarfile.open( + os.path.join(self.root, self.base_folder, self.filename, + self.raw_file)) + tar.extractall( + os.path.join(self.root, self.base_folder, self.filename)) + tar.close() + return + + def _read_raw(self): + dir_path = os.path.join(self.root, self.base_folder, self.filename, + 'training_set') + self._extract_raw_file(dir_path) + frames = [] + for idx, name in enumerate(self.mv_names): + mv_id = np.int32(idx + 1) + df = pd.read_csv(os.path.join(dir_path, name), + usecols=[0, 1, 2], + names=["userId", "rating", "date"], + dtype={ + "userId": np.int32, + "movieId": np.int32, + "rating": np.float32, + "date": str + }, + skiprows=1) + df["movieId"] = [mv_id] * len(df) + frames.append(df) + data = pd.concat(frames) + return data + + +class VFLNetflix(Netflix, VMFDataset): + """Netflix dataset in HFL setting + + """ + pass + + +class HFLNetflix(Netflix, HMFDataset): + """Netflix dataset in HFL setting + + """ + pass diff --git a/federatedscope/mf/model/model.py b/federatedscope/mf/model/model.py index d617791df..7246cfe98 100644 --- a/federatedscope/mf/model/model.py +++ b/federatedscope/mf/model/model.py @@ -32,6 +32,7 @@ def __init__(self, num_user, num_item, num_hidden): self.register_parameter('embed_item', self.embed_item) def forward(self, indices, ratings): + # TODO: do not use all embedding pred = torch.matmul(self.embed_user, self.embed_item.T) label = torch.sparse_coo_tensor(indices, ratings,