Skip to content

Commit

Permalink
Feature/text aug mapper (#17)
Browse files Browse the repository at this point in the history
* + add simple_aug_en_mapper and its unit test, docs
+ reorder the OP list in alphabet order

* + add simple_aug_zh_mapper and its unit test, docs
* update tracer for batch mappers

* + add missing dependencies

* + add an argument to control whether export result datasets to single file in parallel

* * add new arguments to config_all.yaml
- remove some useless code

* * update for config: copy the config file into work_dir; display the final configs

* * resolve some comments from PR

* - remove BatchMapper and decide whether to turn on the batched mode automatically in map
  • Loading branch information
HYLcool authored Sep 13, 2023
1 parent 3b8da75 commit e221d06
Show file tree
Hide file tree
Showing 17 changed files with 872 additions and 123 deletions.
29 changes: 25 additions & 4 deletions configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
# global parameters
project_name: 'all' # project name for distinguish your configs
dataset_path: '/path/to/your/dataset' # path to your dataset directory or file with weights(0.0-1.0), 1.0 as default.
# Accepted format: 'weight1(optional) dataset1-path weight2(optional) dataset2-path '
# Accepted format: 'weight1(optional) dataset1-path weight2(optional) dataset2-path'
export_path: '/path/to/result/dataset.jsonl' # path to processed result dataset. Supported suffixes include ['jsonl', 'json', 'parquet']
export_shard_size: 0 # Shard size of exported dataset in Byte. In default, it's 0, which means export the whole dataset into only one file. If it's set a positive number, the exported dataset will be split into several dataset shards, and the max size of each shard won't larger than the export_shard_size
export_in_parallel: false # Whether to export the result dataset in parallel to a single file, which usually takes less time. It only works when export_shard_size is 0, and its default number of processes is the same as the argument np. **Notice**: If it's True, sometimes exporting in parallel might require much more time due to the IO blocking, especially for very large datasets. When this happens, False is a better choice, although it takes more time.
np: 4 # number of subprocess to process your dataset
text_keys: 'content' # the key name of field where the sample texts to be processed, e.g., `text`, `instruction`, `output`, ...'
text_keys: 'content' # the key name of field where the sample texts to be processed, e.g., `text`, `instruction`, `output`, ...
# Note: currently, we support specify only ONE key for each op, for cases requiring multiple keys, users can specify the op multiple times. We will only use the first key of `text_keys` when you set multiple keys.
suffixes: [] # the suffix of files that will be read. For example: '.txt', 'txt' or ['txt', '.pdf', 'docx']
use_cache: true # whether to use the cache management of hugging face datasets. It might take up lots of disk space when using cache
Expand All @@ -34,6 +35,26 @@ process:
- clean_copyright_mapper: # remove copyright comments.
- expand_macro_mapper: # expand macro definitions in Latex text.
- fix_unicode_mapper: # fix unicode errors in text.
- nlpaug_en_mapper: # simply augment texts in English based on the nlpaug library
sequential: false # whether combine all augmentation methods to a sequence. If it's True, a sample will be augmented by all opened augmentation methods sequentially. If it's False, each opened augmentation method would generate its augmented samples independently.
aug_num: 1 # number of augmented samples to be generated. If `sequential` is True, there will be total aug_num augmented samples generated. If it's False, there will be (aug_num * #opened_aug_method) augmented samples generated.
delete_random_word: false # whether to open the augmentation method of deleting random words from the original texts. e.g. "I love LLM" --> "I LLM"
swap_random_word: false # whether to open the augmentation method of swapping random contiguous words in the original texts. e.g. "I love LLM" --> "Love I LLM"
spelling_error_word: false # whether to open the augmentation method of simulating the spelling error for words in the original texts. e.g. "I love LLM" --> "Ai love LLM"
split_random_word: false # whether to open the augmentation method of splitting words randomly with whitespaces in the original texts. e.g. "I love LLM" --> "I love LL M"
keyboard_error_char: false # whether to open the augmentation method of simulating the keyboard error for characters in the original texts. e.g. "I love LLM" --> "I ;ov4 LLM"
ocr_error_char: false # whether to open the augmentation method of simulating the OCR error for characters in the original texts. e.g. "I love LLM" --> "I 10ve LLM"
delete_random_char: false # whether to open the augmentation method of deleting random characters from the original texts. e.g. "I love LLM" --> "I oe LLM"
swap_random_char: false # whether to open the augmentation method of swapping random contiguous characters in the original texts. e.g. "I love LLM" --> "I ovle LLM"
insert_random_char: false # whether to open the augmentation method of inserting random characters into the original texts. e.g. "I love LLM" --> "I ^lKove LLM"
- nlpcda_zh_mapper: # simply augment texts in Chinese based on the nlpaug library
sequential: false # whether combine all augmentation methods to a sequence. If it's True, a sample will be augmented by all opened augmentation methods sequentially. If it's False, each opened augmentation method would generate its augmented samples independently.
aug_num: 1 # number of augmented samples to be generated. If `sequential` is True, there will be total aug_num augmented samples generated. If it's False, there will be (aug_num * #opened_aug_method) augmented samples generated.
replace_similar_word: false # whether to open the augmentation method of replacing random words with their similar words in the original texts. e.g. "这里一共有5种不同的数据增强方法" --> "这边一共有5种不同的数据增强方法"
replace_homophone_char: false # whether to open the augmentation method of replacing random characters with their homophones in the original texts. e.g. "这里一共有5种不同的数据增强方法" --> "这里一共有5种不同的濖据增强方法"
delete_random_char: false # whether to open the augmentation method of deleting random characters from the original texts. e.g. "这里一共有5种不同的数据增强方法" --> "这里一共有5种不同的数据增强"
swap_random_char: false # whether to open the augmentation method of swapping random contiguous characters in the original texts. e.g. "这里一共有5种不同的数据增强方法" --> "这里一共有5种不同的数据强增方法"
replace_equivalent_num: false # whether to open the augmentation method of replacing random numbers with their equivalent representations in the original texts. **Notice**: Only for numbers for now. e.g. "这里一共有5种不同的数据增强方法" --> "这里一共有伍种不同的数据增强方法"
- punctuation_normalization_mapper: # normalize unicode punctuations to English punctuations.
- remove_bibliography_mapper: # remove bibliography from Latex text.
- remove_comments_mapper: # remove comments from Latex text, code, etc.
Expand All @@ -54,8 +75,8 @@ process:
lang: en # sample in which language
tokenization: false # whether to use model to tokenize documents
substrings: ['http', 'www', '.com', 'href', '//'] # incorrect substrings to remove
- sentence_split_mapper: # split text to sentences and join them with '\n'
lang: 'en' # split text in what language
- sentence_split_mapper: # split text to multiple sentences and join them with '\n'
lang: 'en' # split text in what language
- whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace.

# Filter ops
Expand Down
53 changes: 50 additions & 3 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import shutil
import time
from argparse import ArgumentError
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Tuple, Union

from jsonargparse import (ActionConfigFile, ArgumentParser, dict_to_namespace,
namespace_to_dict)
Expand Down Expand Up @@ -65,6 +66,18 @@ def init_configs(args=None):
'it\'s set a positive number, the exported dataset will be split '
'into several sub-dataset shards, and the max size of each shard '
'won\'t larger than the export_shard_size')
parser.add_argument(
'--export_in_parallel',
type=bool,
default=False,
help='Whether to export the result dataset in parallel to a single '
'file, which usually takes less time. It only works when '
'export_shard_size is 0, and its default number of processes is '
'the same as the argument np. **Notice**: If it\'s True, '
'sometimes exporting in parallel might require much more time '
'due to the IO blocking, especially for very large datasets. '
'When this happens, False is a better choice, although it takes '
'more time.')
parser.add_argument(
'--np',
type=PositiveInt,
Expand Down Expand Up @@ -219,6 +232,13 @@ def init_configs(args=None):
}

cfg = init_setup_from_cfg(cfg)

# copy the config file into the work directory
config_backup(cfg)

# show the final config tables before the process started
display_config(cfg)

return cfg
except ArgumentError:
logger.error('Config initialization failed')
Expand All @@ -241,8 +261,9 @@ def init_setup_from_cfg(cfg):
log_dir = os.path.join(cfg.work_dir, 'log')
if not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
logfile_name = time.strftime('%Y%m%d%H%M%S', time.localtime(
time.time())) + '.txt'
timestamp = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
cfg.timestamp = timestamp
logfile_name = timestamp + '.txt'
setup_logger(save_dir=log_dir, filename=logfile_name)

# whether or not to use cache management
Expand Down Expand Up @@ -335,3 +356,29 @@ def sort_op_by_types_and_names(op_name_classes):
ops_sorted_by_types = sorted(mapper_ops) + sorted(filter_ops) + sorted(
deduplicator_ops) + sorted(selector_ops)
return ops_sorted_by_types

def config_backup(cfg):
cfg_path = cfg.config[0].absolute
work_dir = cfg.work_dir
target_path = os.path.join(work_dir, os.path.basename(cfg_path))
logger.info(f'Back up the input config file [{cfg_path}] into the '
f'work_dir [{work_dir}]')
shutil.copyfile(cfg_path, target_path)

def display_config(cfg):
from tabulate import tabulate
import pprint
table_header = ['key', 'values']

# remove ops outside the process list for better displaying
shown_cfg = cfg.clone()
for op in OPERATORS.modules.keys():
_ = shown_cfg.pop(op)

# construct the table as 2 columns
config_table = [(k, pprint.pformat(v, compact=True))
for k, v in shown_cfg.items()]
table = tabulate(config_table, headers=table_header, tablefmt='fancy_grid')

logger.info('Configuration table: ')
print(table)
1 change: 1 addition & 0 deletions data_juicer/core/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self, cfg=None):
logger.info('Preparing exporter...')
self.exporter = Exporter(self.cfg.export_path,
self.cfg.export_shard_size,
self.cfg.export_in_parallel,
self.cfg.np,
export_ds=False,
export_stats=True)
Expand Down
14 changes: 14 additions & 0 deletions data_juicer/core/data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import inspect
from functools import wraps
from typing import Union

Expand Down Expand Up @@ -140,12 +141,25 @@ def map(self, *args, **kargs):
args[0] = lambda x: nested_obj_factory(x)
else:
args[0] = wrap_func_with_nested_access(args[0])
called_func = args[0]
else:
if 'function' not in kargs or kargs['function'] is None:
kargs['function'] = lambda x: nested_obj_factory(x)
else:
kargs['function'] = wrap_func_with_nested_access(
kargs['function'])
called_func = kargs['function']

# For wrapped function, try to get its original unwrapped method
while hasattr(called_func, '__wrapped__'):
called_func = called_func.__wrapped__
# Does the called function belong to a batched OP?
if inspect.ismethod(called_func) \
and 'is_batched_op' in dir(called_func.__self__) \
and callable(getattr(called_func.__self__, 'is_batched_op')) \
and called_func.__self__.is_batched_op():
kargs['batched'] = True
kargs['batch_size'] = 1

return NestedDataset(super().map(*args, **kargs))

Expand Down
21 changes: 17 additions & 4 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def __init__(self, cfg=None):
# prepare exporter and check export path suffix
logger.info('Preparing exporter...')
self.exporter = Exporter(self.cfg.export_path,
self.cfg.export_shard_size, self.cfg.np)
self.cfg.export_shard_size,
self.cfg.export_in_parallel,
self.cfg.np)

# setup tracer
self.open_tracer = self.cfg.open_tracer
Expand Down Expand Up @@ -105,11 +107,22 @@ def run(self, load_data_np=None):
prev = dataset # record last dataset
try:
if isinstance(op, Mapper):
tmp = dataset.map(op.process,
tmp = dataset.map(function=op.process,
num_proc=self.cfg.np,
desc=op_name + '_process')
if self.open_tracer and op_name in self.op_list_to_trace:
self.tracer.trace_mapper(op_name, dataset, tmp)
if self.open_tracer and \
op_name in self.op_list_to_trace:
if op.is_batched_op():
self.tracer.trace_batch_mapper(
op_name,
dataset,
tmp,
op.text_key)
else:
self.tracer.trace_mapper(op_name,
dataset,
tmp,
op.text_key)
elif isinstance(op, Filter):
if Fields.stats not in dataset.features:
# TODO:
Expand Down
20 changes: 15 additions & 5 deletions data_juicer/core/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Exporter:
def __init__(self,
export_path,
export_shard_size=0,
export_in_parallel=True,
num_proc=1,
export_ds=True,
export_stats=True):
Expand All @@ -34,6 +35,7 @@ def __init__(self,
"""
self.export_path = export_path
self.export_shard_size = export_shard_size
self.export_in_parallel = export_in_parallel
self.export_ds = export_ds
self.export_stats = export_stats
self.suffix = self._get_suffix(export_path)
Expand Down Expand Up @@ -101,8 +103,11 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True):
export_method = Exporter._router()[suffix]
if self.export_shard_size <= 0:
# export the whole dataset into one single file.
logger.info('Export dataset into 1 file...')
export_method(dataset, export_path)
logger.info('Export dataset into a single file...')
export_method(
dataset,
export_path,
num_proc=self.num_proc if self.export_in_parallel else 1)
else:
# compute the dataset size and number of shards to split
if dataset._indices is not None:
Expand Down Expand Up @@ -153,7 +158,11 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True):
# export stats of datasets into a single file.
ds_stats = dataset.select_columns(Fields.stats)
stats_file = export_path.replace('.' + suffix, '_stats.jsonl')
Exporter.to_jsonl(ds_stats, stats_file)
Exporter.to_jsonl(
ds_stats,
stats_file,
num_proc=self.num_proc if self.export_in_parallel else 1
)

def export(self, dataset):
"""
Expand All @@ -166,16 +175,17 @@ def export(self, dataset):
self.export_stats)

@staticmethod
def to_jsonl(dataset, export_path, **kwargs):
def to_jsonl(dataset, export_path, num_proc=1, **kwargs):
"""
Export method for json/jsonl target files.
:param dataset: the dataset to export.
:param export_path: the path to store the exported dataset.
:param num_proc: the number of processes used to export the dataset.
:param kwargs: extra arguments.
:return:
"""
dataset.to_json(export_path, force_ascii=False)
dataset.to_json(export_path, force_ascii=False, num_proc=num_proc)

@staticmethod
def to_parquet(dataset, export_path, **kwargs):
Expand Down
Loading

0 comments on commit e221d06

Please sign in to comment.