Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Pile as a SeqIO task #8

Draft
wants to merge 37 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
54810ce
First draft of how I think seqio works
thomasw21 Nov 5, 2021
9a3146c
Improve pipeline
thomasw21 Nov 5, 2021
bccaa5f
Added script to download pile to gcp bucket
thomasw21 Nov 6, 2021
ff7a387
I need to send to remove file progressively as I don't have access to…
thomasw21 Nov 6, 2021
adbc5a9
Woops
thomasw21 Nov 6, 2021
4cb7ae8
Woops2
thomasw21 Nov 6, 2021
1096de7
convert o map instead
thomasw21 Nov 6, 2021
f53f21b
Woops 3
thomasw21 Nov 6, 2021
a1f6e0a
Make it sequential
thomasw21 Nov 6, 2021
a9d69f7
Remove recursive option
thomasw21 Nov 6, 2021
68255a1
Update script to remove uncompressed file as well
thomasw21 Nov 6, 2021
744cc35
Test out the rest of the script
thomasw21 Nov 6, 2021
e4502a6
Woops
thomasw21 Nov 6, 2021
340ed01
Add back download step
thomasw21 Nov 6, 2021
4a58865
Cache seqio task
thomasw21 Nov 8, 2021
8995fbf
Add script in order to run caching
thomasw21 Nov 8, 2021
844c426
Test something out
thomasw21 Nov 8, 2021
3e294d9
test something else:
thomasw21 Nov 8, 2021
774e3e7
Woops
thomasw21 Nov 8, 2021
964340d
Setup file needs to be names setup.py
thomasw21 Nov 8, 2021
e17d454
Make a another package
thomasw21 Nov 8, 2021
cc74fb4
Somehow task is not part of pile package
thomasw21 Nov 8, 2021
7da3b2a
Revert "Make a another package"
thomasw21 Nov 8, 2021
a9edd8a
Remove __init__
thomasw21 Nov 8, 2021
385f262
Rename to pile
thomasw21 Nov 8, 2021
0282f7b
Fix
thomasw21 Nov 8, 2021
c4cf146
We need to install t5
thomasw21 Nov 8, 2021
f3ea974
No need to commit egg-info
thomasw21 Nov 8, 2021
a91207a
Some more fixing
thomasw21 Nov 8, 2021
3ced63b
Use 32 workers, ie the number of files
thomasw21 Nov 8, 2021
4cd94ea
update script
thomasw21 Nov 8, 2021
630143e
woops
thomasw21 Nov 8, 2021
7591d12
Woops read wrong doc
thomasw21 Nov 8, 2021
8131647
Choose machine type
thomasw21 Nov 8, 2021
c85fbb3
Preprocess only the two first files
thomasw21 Nov 8, 2021
01f9c6d
Revert "Preprocess only the two first files"
thomasw21 Nov 9, 2021
ec3989b
Update path of cache
thomasw21 Nov 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions t5x/configs/dataset/pile/download_all_pile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import argparse
import functools
import subprocess
from multiprocessing import Pool
import wget

def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--procs", type=int, required=True, help="Number of processes."
)
parser.add_argument(
"--local-base-dir", type=str, required=True, help="Folder to download the document to"
)
return parser.parse_args()


def download_unztd_and_send_to_gcloud(relative_path, local_base_dir, gcp_base):
BASE_PILE_URL = "https://the-eye.eu/public/AI/pile"
local_path = f"{local_base_dir}/{relative_path}"

# Create folder
process = subprocess.Popen(["mkdir", "-p", local_path.rsplit("/", 1)[0]])
process.wait()

# download files
wget.download(f"{BASE_PILE_URL}/{relative_path}", local_path)
process.wait()

# decompress files
process = subprocess.Popen(['zstd', '-d', local_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
process.wait()

assert local_path.endswith(".zst")
local_uncompressed_path = local_path[:-4]
assert relative_path.endswith(".zst")
gcp_uncompressed_path = f"{gcp_base}/{relative_path[:-4]}"

# upload to gcp
process = subprocess.Popen(["gsutil", "cp", local_uncompressed_path, gcp_uncompressed_path])
process.wait()

# delete file locally
process = subprocess.Popen(['rm', local_path])
process.wait()
process = subprocess.Popen(['rm', local_uncompressed_path])
process.wait()

def main():
args = get_args()

pile_urls = {
"train": [
f"train/{i:02d}.jsonl.zst" for i in range(30)
],
"test": [
f"test.jsonl.zst"
],
"val": [
f"val.jsonl.zst"
]
}
local_base_dir = args.local_base_dir
gcp_base = "gs://bigscience/pile/raw"

process = subprocess.Popen(["mkdir", "-p", local_base_dir])
process.wait()

# pool = Pool(args.procs)
# pool.map(
# functools.partial(download_unztd_and_send_to_gcloud, local_base_dir=local_base_dir, gcp_base=gcp_base),
# [local_path for _, local_paths in pile_urls.items() for local_path in local_paths]
# )
for local_path in [local_path for _, local_paths in pile_urls.items() for local_path in local_paths]:
download_unztd_and_send_to_gcloud(local_path, local_base_dir=local_base_dir, gcp_base=gcp_base)

if __name__ == "__main__":
main()
Empty file.
74 changes: 74 additions & 0 deletions t5x/configs/dataset/pile/pile/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import functools
import seqio
from t5.data import preprocessors, utils
import tensorflow as tf

vocabulary = seqio.SentencePieceVocabulary(
'gs://t5-data/vocabs/cc_all.32000/sentencepiece.model', extra_ids=100)
output_features = {
'inputs': seqio.Feature(vocabulary=vocabulary),
'targets': seqio.Feature(vocabulary=vocabulary)
}

DEFAULT_OUTPUT_FEATURES = {
"inputs": seqio.Feature(
vocabulary=vocabulary, add_eos=True,
required=False),
"targets": seqio.Feature(
vocabulary=vocabulary, add_eos=True)
}

DATASET_FOLDER="gs://bigscience/pile/raw"
DATASET_SPLITS_TO_FILEPATTERN={
"train": f"{DATASET_FOLDER}/train/*.jsonl",
"val": f"{DATASET_FOLDER}/val.jsonl",
"test": f"{DATASET_FOLDER}/test.jsonl"
}

@utils.map_over_dataset
def extract_text_from_json_tf(json: str):
output = tf.strings.split(json, '{"text": "', maxsplit=1)[1]
output = tf.strings.split(output, '", "meta": {', maxsplit=1)[0]
return {"text": output}

seqio.TaskRegistry.add(
'pile_t2t_span_corruption',
source=seqio.TextLineDataSource(
split_to_filepattern=DATASET_SPLITS_TO_FILEPATTERN,
),
preprocessors=[
extract_text_from_json_tf,
functools.partial(
preprocessors.rekey, key_map={
"inputs": None,
"targets": "text"
}),
seqio.preprocessors.tokenize,
seqio.CacheDatasetPlaceholder(),
preprocessors.span_corruption,
seqio.preprocessors.append_eos_after_trim,
],
output_features=DEFAULT_OUTPUT_FEATURES,
metric_fns=[]
)

seqio.TaskRegistry.add(
"pile_t2t_prefix_lm",
source=seqio.TextLineDataSource(
split_to_filepattern=DATASET_SPLITS_TO_FILEPATTERN,
),
preprocessors=[
extract_text_from_json_tf,
functools.partial(
preprocessors.rekey, key_map={
"inputs": None,
"targets": "text"
}),
seqio.preprocessors.tokenize,
seqio.CacheDatasetPlaceholder(),
preprocessors.prefix_lm,
seqio.preprocessors.append_eos_after_trim,
],
output_features=DEFAULT_OUTPUT_FEATURES,
metric_fns=[]
)
16 changes: 16 additions & 0 deletions t5x/configs/dataset/pile/run_cache_tasks_main.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Need to install seqio
# gcloud auth application-default login


MODULE_IMPORT=pile.task
TASK_NAME=pile_t2t_span_corruption
JOB_NAME=pilet2tspancorruption # the name must consist of only the characters [-a-z0-9], starting with a letter and ending with a letter or number
BUCKET=gs://bigscience/seqio_cached_tasks/$TASK_NAME # Don't know is cache needs to be task specific or not ...
PROJECT=bigscience
REGION=europe-west1

seqio_cache_tasks \
--module_import=$MODULE_IMPORT \
--tasks=${TASK_NAME} \
--output_cache_dir=${BUCKET}/cache \
--pipeline_options="--runner=DataflowRunner,--project=$PROJECT,--region=$REGION,--job_name=$JOB_NAME,--staging_location=$BUCKET/binaries,--temp_location=$BUCKET/tmp,--setup_file=$PWD/setup.py,--num_workers=32,--autoscaling_algorithm=NONE,--machine_type=n1-highmem-2"
125 changes: 125 additions & 0 deletions t5x/configs/dataset/pile/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Setup.py module for the workflow's worker utilities.
All the workflow related code is gathered in a package that will be built as a
source distribution, staged in the staging area for the workflow being run and
then installed in the workers when they start running.
This behavior is triggered by specifying the --setup_file command line option
when running the workflow for remote execution.
"""

# pytype: skip-file

import subprocess
from distutils.command.build import build as _build # type: ignore

import setuptools


# This class handles the pip install mechanism.
class build(_build): # pylint: disable=invalid-name
"""A build command class that will be invoked during package install.
The package built using the current setup.py will be staged and later
installed in the worker using `pip install package'. This class will be
instantiated during install for this specific scenario and will trigger
running the custom commands specified.
"""
sub_commands = _build.sub_commands + [('CustomCommands', None)]


# Some custom command to run during setup. The command is not essential for this
# workflow. It is used here as an example. Each command will spawn a child
# process. Typically, these commands will include steps to install non-Python
# packages. For instance, to install a C++-based library libjpeg62 the following
# two commands will have to be added:
#
# ['apt-get', 'update'],
# ['apt-get', '--assume-yes', 'install', 'libjpeg62'],
#
# First, note that there is no need to use the sudo command because the setup
# script runs with appropriate access.
# Second, if apt-get tool is used then the first command needs to be 'apt-get
# update' so the tool refreshes itself and initializes links to download
# repositories. Without this initial step the other apt-get install commands
# will fail with package not found errors. Note also --assume-yes option which
# shortcuts the interactive confirmation.
#
# Note that in this example custom commands will run after installing required
# packages. If you have a PyPI package that depends on one of the custom
# commands, move installation of the dependent package to the list of custom
# commands, e.g.:
#
# ['pip', 'install', 'my_package'],
#
# TODO(BEAM-3237): Output from the custom commands are missing from the logs.
# The output of custom commands (including failures) will be logged in the
# worker-startup log.
CUSTOM_COMMANDS = [
['echo', 'Custom command worked!'],
['pip', 'install', 'seqio'],
['pip', 'install', 't5[cache-tasks]']
]


class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
pass

def finalize_options(self):
pass

def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))

def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)


# Configure the required packages and scripts to install.
# Note that the Python Dataflow containers come with numpy already installed
# so this dependency will not trigger anything to be installed unless a version
# restriction is specified.
REQUIRED_PACKAGES = [
'numpy',
]

setuptools.setup(
name='pile',
version='0.0.1',
description='Cache pile set workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
# Command class instantiated and run during pip install scenarios.
'build': build,
'CustomCommands': CustomCommands,
})