Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mC4 preprocess #8

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions examples/c4_mc4_processing/cache_c4_mc4.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Note: Running this script may require bandwidth ~30MB/s per language.
# You may run this script mutiple times to make the caching faster.
# The script `tools/c4_mc4/c4_mc4_cache.py` will perform caching,
# if a caching folder for a language doesn't exists. So running
# the script multiple times with same cache folder will be ok.

CACHE_DIR="dumped/c4_mc4_raw_data"
mkdir -p $CACHE_DIR

# excluding en since it's already been processed. Please add your language here.
# for LANG in "ar" "sw" "zh" "zh-Latn" "ca" "fr" "hi" "ur" "bn" "id" "pt" "es" "ru" "ru-Latn" "ja" "am"; do
for LANG in "am" ; do
DATASET_NAME="mc4"
if [[ $LANG == "en" ]]
then
DATASET_NAME="c4"
fi
echo "Caching "$LANG
sleep $((1 + RANDOM % 2))
python3 -u tools/c4_mc4/c4_mc4_cache.py \
--dataset-name $DATASET_NAME \
--lang $LANG \
--cache-dir $CACHE_DIR
done
20 changes: 20 additions & 0 deletions examples/c4_mc4_processing/data_resize.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CACHE_DIR="../c4/mc4_splits/"
DATASET_NAME="mc4"
NEW_EXPECTED_SIZE=576
OUTPUT_DIR="dumped/c4_mc4_raw_data_resized-"$NEW_EXPECTED_SIZE

mkdir -p $OUTPUT_DIR
ALPHA=.01
MIN_HIGH_RESOURCE_SIZE=12
MAX_HIGH_RESOURCE_SIZE=100

python3 -u tools/c4_mc4/data_resize.py \
--dataset-name $DATASET_NAME \
--size-format "GB" \
--languages "ar" "sw" "zh" "zh-Latn" "ca" "fr" "hi" "ur" "bn" "id" "pt" "es" "ru" "ru-Latn" "ja" "am" \
--cache-dir $CACHE_DIR \
--new-expected-size $NEW_EXPECTED_SIZE \
--output-dir $OUTPUT_DIR \
--min_high_resource_size $MIN_HIGH_RESOURCE_SIZE \
--max_high_resource_size $MAX_HIGH_RESOURCE_SIZE \
--alpha $ALPHA
38 changes: 38 additions & 0 deletions tools/c4_mc4/c4_mc4_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import datasets
import argparse

def main():
parser = argparse.ArgumentParser()
parser.add_argument('--dataset-name', type=str, required=True,
help='Name of the dataset.',
choices=['c4', 'mc4'])
parser.add_argument('--lang', type=str, required=True,
help='Name of the langugae.')
parser.add_argument('--cache-dir', type=str, required=True,
help='Path to the cache dir. (The directory may require very large space)')
args = parser.parse_args()

os.makedirs(args.cache_dir, exist_ok=True)
lang_cache_log = os.path.join(args.cache_dir, args.lang+".log")
if not os.path.exists(lang_cache_log):
open(lang_cache_log, 'w').write("Data downloading and processing.\n")
try:
print("downloading {}".format(args.lang))
dataset_name="mc4"
if args.lang == "en":
dataset_name="c4"
print('Running \"mc4_dataset = datasets.load_dataset({}, {}, cache_dir={})\"'.format(
args.dataset_name, args.lang, args.cache_dir
))
mc4_dataset = datasets.load_dataset(args.dataset_name, args.lang, cache_dir=args.cache_dir)
except:
raise Exception("Download failed for {} lang".format(args.lang))
open(lang_cache_log, 'a').write("Data caching failed.\n")
open(lang_cache_log, 'a').write("Data caching for {} language completed.\n".format(args.lang))
else:
print("Data processing ofr {} language started or completed.".format(args.lang))


if __name__ == '__main__':
main()
187 changes: 187 additions & 0 deletions tools/c4_mc4/data_resize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import os
import math
import copy
import json
import datasets
import argparse
import subprocess
from collections import OrderedDict

def get_size_stats(args):
lang_size_dict, tot_size = {}, 0
for lang in args.languages:
lang_folder_path = os.path.join(
os.path.join(args.cache_dir, args.dataset_name),
lang
)
lang_size = subprocess.check_output("du -s {}".format(lang_folder_path), shell=True)
lang_size = int(lang_size.decode("utf-8").split("\t")[0])
if args.size_format == 'B':
_conv = 1
elif args.size_format == 'MB':
_conv = 1024
elif args.size_format == 'GB':
_conv = 1024*1024
elif args.size_format == 'TB':
_conv = 1024*1024*1024
lang_size_gb = round(lang_size/float(_conv), 2)
tot_size += lang_size_gb
lang_size_dict[lang] = lang_size_gb
return lang_size_dict

def print_stat(args, lang_size_dict):
lang_list = sorted([(k,v) for k, v in lang_size_dict.items()], key=lambda tup: tup[1])
total_size = 0
print("Language : Size ")
print("-"*20)
for lang, size in lang_list:
print("{} : {}".format(lang, size))
total_size += size
print("-"*20)
print("Total size : {}".format(total_size))
print("Expected size afted resizing : {}".format(args.new_expected_size))
print("Per language allocated size : {}".format(args.new_expected_size/len(args.languages)))

def find_and_distribute_low_resoure_language(args, lang_size_dict, sampling_weight):
total_size = sum([v for k, v in lang_size_dict.items()])
mean_size_for_each_lang = args.new_expected_size/len(args.languages)
tot_low_resource_lang_size = 0
print("Low resource languages :", end="")
for lang, size in lang_size_dict.items():
if size < mean_size_for_each_lang:
sampling_weight[lang] = 1.0
tot_low_resource_lang_size += size
print(" {}({})".format(lang, size), end="")
print("")
print("Total size consumed by low resource languages {}".format(tot_low_resource_lang_size))
return tot_low_resource_lang_size

def calc_multinomial_sampling_prob_with_penalty(dataset_size, alpha=.5):
tot_size = 0
probs = OrderedDict()
for lang, size in dataset_size.items():
tot_size += size
for lang, size in dataset_size.items():
probs[lang] = size/tot_size

pen_prob = OrderedDict()
tot_pen_prob = 0.0
for lang, prob in probs.items():
tot_pen_prob += (prob**alpha)
sum_ = 0.0
for lang, prob in probs.items():
pen_prob[lang] = (prob**alpha)/tot_pen_prob
sum_ += pen_prob[lang]
assert math.fabs(1-sum_) < 1e-6
return pen_prob

def distribute_high_resoure_language(args, lang_dict, sampling_probability, total_size_capacity):
lang_size_dict = copy.deepcopy(lang_dict)
total_high_resource_capacity = total_size_capacity
for lang, prob in sampling_probability.items():
if prob == 1.0:
del lang_size_dict[lang]
high_resource_sampling_prob = calc_multinomial_sampling_prob_with_penalty(lang_size_dict, alpha=args.alpha)
print("Sampling High resource language based on multinomial distribution with alpha {}".format(args.alpha))
print("-"*80)
total_high_resource_lang_size = 0
lang_fixed, high_resource_size = {}, {}
for lang, prob in high_resource_sampling_prob.items():
new_prob = prob
new_prob_str = ""
new_size = lang_size_dict[lang] * new_prob
if new_size < args.min_high_resource_size:
lang_fixed[lang] = True
new_size = args.min_high_resource_size
new_size = min(lang_size_dict[lang], new_size)
new_prob = new_size/lang_size_dict[lang]
new_prob_str="-> {}".format(round(new_prob, 2))
if new_size > args.max_high_resource_size:
new_size = args.max_high_resource_size
new_prob = new_size/lang_size_dict[lang]
new_prob_str="-> {}".format(round(new_prob, 2))
high_resource_sampling_prob[lang] = new_prob
high_resource_size[lang] = new_size
sampling_probability[lang] = prob
print("Language : {}, Sampling prob : {} {}, ({} -> {} GB)".format(
lang, round(prob,2), new_prob_str, lang_size_dict[lang], round(new_size) )
)
total_size_capacity -= new_size
total_high_resource_lang_size += new_size
print("Expected high resource size {}, Total Size : {}".format(total_high_resource_capacity, total_high_resource_lang_size))
adjustment = total_size_capacity
if adjustment > 0:
print("Performing adjustment ...")
for lang, size in high_resource_size.items():
if size == args.max_high_resource_size:
lang_fixed[lang] = True
_flag = True
while adjustment > 0 and _flag:
_flag = False
for lang, size in high_resource_size.items():
if lang not in lang_fixed and adjustment > 0:
if size < lang_size_dict[lang]:
_dist_val = min(1, lang_size_dict[lang]-size)
_dist_val = min(_dist_val, adjustment)
high_resource_size[lang] += _dist_val
adjustment -= _dist_val
_flag = True
for lang, size in high_resource_size.items():
_sampling_prob = high_resource_size[lang]/lang_size_dict[lang]
sampling_probability[lang] = _sampling_prob
return sampling_probability


def main():
parser = argparse.ArgumentParser()
parser.add_argument('--dataset-name', type=str, required=True,
help='Name of the dataset.',
choices=['c4', 'mc4'])
parser.add_argument('--languages', nargs='+', required=True,
help='Name of the langugae.')
parser.add_argument('--cache-dir', type=str, required=True,
help='Path to the cache dir. (The directory may require very large space)')
parser.add_argument('--size-format', type=str, required=True,
help='Calculation will be done either byte, mega-byte or tera-byte',
choices=['B', 'MB', 'GB', 'TB'])
parser.add_argument('--new-expected-size', type=int, required=True,
help='Total amount of data to be selected.')
parser.add_argument('--output-dir', type=str, required=True,
help='Output directory where data will be saved.')
parser.add_argument('--alpha', type=float, required=True,
help='Sampling penalty.')
parser.add_argument('--min_high_resource_size', type=int, required=True,
help='Sampling penalty.')
parser.add_argument('--max_high_resource_size', type=int, required=True,
help='Sampling penalty.')
args = parser.parse_args()

total_size_capacity = args.new_expected_size

lang_size_dict = get_size_stats(args)
print_stat(args, lang_size_dict)

sampling_probability = {lang: -1 for lang in args.languages}
low_resource_size_consumed = find_and_distribute_low_resoure_language(args, lang_size_dict, sampling_probability)
total_size_capacity = total_size_capacity - low_resource_size_consumed
distribute_high_resoure_language(args, lang_size_dict, sampling_probability, total_size_capacity)

total_size = 0
print("\nFinal Breakdown")
print("-"*15)
for lang, prob in sampling_probability.items():
_size = lang_size_dict[lang]*prob
print("Language : {}, Sampling prob : {}, ({} -> {} GB)".format(
lang, round(prob,2), lang_size_dict[lang], round(_size, 2) )
)
total_size += _size
print("Expected resource size {}, Total Size : {}".format(args.new_expected_size, round(total_size,1)))
open(os.path.join(args.output_dir, 'lang_dict.json'), "w").write(
json.dumps(sampling_probability, indent=4)
)


if __name__ == '__main__':
main()

# python3 tools/c4_mc4/iterator_selection_prob.py --dataset_name mc4 --languages "ar" "sw" "zh" "zh-Latn" "ca" "fr" "hi" "ur" "bn" "id" "pt" "es" "ru" "ru-Latn" "ja" "am" --cache_dir ../c4/mc4_splits/