Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed merge of per-rank Megatron data files #55

Merged
merged 78 commits into from
Aug 26, 2021

Conversation

adammoody
Copy link
Contributor

@adammoody adammoody commented Aug 9, 2021

This can speed up the merge step, but it requires that the user is writing the final dataset to a POSIX-complaint parallel file system, like Lustre or GPFS. Each rank identifies the file offsets for its own data using collective operations, fseek's to those sections, and writes its data.

This adds a --merge option to preprocess_data_dist.py, which can be set to any of {parallel, serial, both}. It defaults to parallel, but one can fallback to the algorithm where rank 0 merges all files sequentially with --merge serial. A serial merge might be helpful to people where the parallel merge does not work due to lack of a POSIX-compliant parallel file system. The both option is useful for testing purposes. It merges rank files with both parallel and serial so that the resulting files can be compared with something like cmp.

An optional --scratch option can be used to store intermediate per-rank files in storage local to the compute node, like /dev/shm, which avoids creating those files on the shared file system and offers faster write/read performance, e.g.,

--scratch /dev/shm

TODO:

  • add support for torch.distributed
  • avoid deadlock in case some process throws an exception
  • test corner cases, e.g., rank 0 contributes 0 items
  • double check why version "byte" seems to use 2 bytes -- resolved, <B is encoded as a single byte as expected

Scaling tests:
In running tests to check encoding rates at different node counts, I also get the merge time at the end. The script actually does both a parallel merge and a serial merge, so that I can compare their contents. That also provides an easy way to gather times for both. The parallel merge can optionally write the per-rank file file to a scratch directory with --scratch, like /dev/shm, which removes load from the parallel file system.

Each rank writes its own file, and I'm running 40 ranks per node. Times here are in seconds. Test results can vary based on how busy the (shared) file system is at the time. I've only taken one sample here.

nodes:       8     16    32    64
serial:    617    499   718     -
parallel:   16.1   15.6  17.0  26.8
/dev/shm:    -     14.8   -    22.4

The final merged file is the same size in all cases (529GB), but as the number of ranks increases, the script generates more per-rank files with each one being smaller. The total data being processed is the same, but the file counts can vary. Ideally, if things are bandwidth bound, you'd expect a constant time across each row.

Anyway, the main takeaway is that there is a nice boost using the parallel merge.

The scratch times aren't showing much improvement over writing the per-rank files to the parallel file system. My guess is that the OS has cached the per-rank file in page cache, so it's reading back from memory even when the per-rank file is written to the parallel file system. There might still be some impact on the cost to create and delete those files, but I'm not recording that.

@adammoody
Copy link
Contributor Author

@thomasw21 , I've refreshed this now that the other PR has been merged. This should be usable via mpi4py (at least it works for me). The torch equivalent still needs to be written.

This adds some MPI code in megatron/data/indexed_dataset.py, since that defines the file format(s) and implements the sequential merge. To do this cleanly, we'll probably want to define some sort of dist module that can be imported from both tools/preprocess_dataset_mpi.py and megatron/data/indexed_dataset.py.

I'll add an option to let one toggle between the parallel and sequential merge implementations. When using MPI, it currently does both. Having both in one run was handy so I could run cmp on the resulting files to verify file integrity.

Copy link
Member

@thomasw21 thomasw21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I haven't finished yet, but since I'm going away on holidays today, you can have at least a partial review. Overall looks very promising, especially since we noticed that merging can take a long time and we were only running on a single cpu. What I would recommend:

  • Can you make a mpi-less implementation? I think MPI can help, but essentially this would apply not only in multi node setting, but also in multiprocess setting. This would allow the two other scripts to leverage that parallel merge.
  • If you plan to do it with mpi4py, I think you should implement this with torch.distributed also.
  • I never really asked before, but maybe we can benchmark torch.distributed vs mpi4py . The reason why is we're essentially doing the same thing in both, and we already install torch. So if mpi4py doesn't bring substantial improvement let's remove it.
  • can you add comments especially where you move the position on the FileObject, I think it's much clearer if I can really easily see where we are, "ie at the end of the size part' or something.

Otherwise great work like always! I'll be back on monday to review the rest of the PR!

@@ -417,21 +417,21 @@ def __init__(self, path, skip_warmup=False):
offset = stream.tell()

if not skip_warmup:
print_rank_0(" warming up index mmap file...")
# print_rank_0(" warming up index mmap file...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that it'd be useful to have these. When running with lots of procs, these status messages clutter up the output quite a bit, and they print in a random order since each process prints them independently.

It'd be good to have a way to silence them. For now, I commented them out for testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't rank_0 force only one process to print? After reading the code this seems to work only to torch.distributed. I'd be a favor of dropping mpi4py if it doesn't bring substantial performance improvement.

megatron/data/indexed_dataset.py Outdated Show resolved Hide resolved
megatron/data/indexed_dataset.py Outdated Show resolved Hide resolved
megatron/data/indexed_dataset.py Outdated Show resolved Hide resolved

# To create the binary files given a set of per-rank binary
# files, one simply concatenates the data from the per-rank
# binary files in rank order. We stat each rank file to determine
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember you saying something about stat taking some time before being updated correctly? Am I hallucinating, or do we have potentially a race condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the backing file system being used and also how the procs access the files. On file systems like Lustre and GPFS, the stat info will be up-to-date and consistent to all procs on all nodes.

On a file system like NFS, procs on one node may see a different file size than procs on another node due to client-side caching of file metadata. However, even on NFS, the stat info should be up-to-date and correct from the process where the file was written, and the algorithm used here satisfies that.

Having said all of that, writing any type of shared file in NFS is risky (not well-supported). I think the stance we should take is to say this is perfectly safe for Lustre/GPFS. It may or may not work on NFS -- users can try, but they do so at their own risk.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum so I'm not familiar with these notions, I'm guessing JZ is Lustre or GPFS. After reading the doc, it seems to be the case:

Spectrum Scale system of parallel files (ex-GPFS)
Parallel storage device with SSD disks (GridScaler GS18K SSD) with a capacity of 1 PB

If so, can you add something at the top of file in order to document that? And maybe someone with more expertise on this can take a look. I guess I'll run the code a few times, and check that it matches the vanilla version.

index = MMapIndexedDataset.Index(infile)
sizes = index.sizes
if rank == 0:
docs = index.doc_idx
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So careful, this has a strong assumption that rank=0 has a bin file offset of 0. Though it's guaranteed currently, make sure to document somewhere.

megatron/data/indexed_dataset.py Outdated Show resolved Hide resolved
Comment on lines 708 to 712
f.write(MMapIndexedDataset.Index._HDR_MAGIC)
f.write(struct.pack('<Q', 1))
f.write(struct.pack('<B', code(dtype)))
f.write(struct.pack('<Q', size_count))
f.write(struct.pack('<Q', docs_count))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we want to abstract it to be generci to the other dataset index. Essentially we have to create a new class, something along the line of partial write. It needs to support the following;

  • initialize() -> ran only on rank=0 where it adds all the header
  • seek(offset) -> Essentially you know where you need to add the list.
  • write() -> current write function.


# The list of size values from each rank are
# concatenated and stored as int32.
f.seek(pos + size_offset * np.int32().itemsize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we abstract this into write? Essentially write needs 4 things:

  • sizes
  • docs
  • offset_sizes
  • offset_counts

megatron/data/indexed_dataset.py Outdated Show resolved Hide resolved
@adammoody adammoody changed the title MPI-based parallel merge of per-rank Megatron data files WIP: MPI-based parallel merge of per-rank Megatron data files Aug 12, 2021
@adammoody
Copy link
Contributor Author

Otherwise great work like always! I'll be back on monday to review the rest of the PR!

Sorry I haven't finished yet, but since I'm going away on holidays today, you can have at least a partial review. Overall looks very promising, especially since we noticed that merging can take a long time and we were only running on a single cpu. What I would recommend:

  • Can you make a mpi-less implementation? I think MPI can help, but essentially this would apply not only in multi node setting, but also in multiprocess setting. This would allow the two other scripts to leverage that parallel merge.

  • If you plan to do it with mpi4py, I think you should implement this with torch.distributed also.

I'll check on that. I think a torch.distributed version should be straight-forward based on what I have now. That's already on the TODO list. I suspect we could create something for Python's multiprocessing Pool, too. That will take more work though, since there are assumptions in the existing algorithm that will need to change, namely that each process writes and reads exactly one "part" file. We'd need to come up with equivalent collective ops, too. My guess is that is likely possible, but I have not considered it yet.

  • I never really asked before, but maybe we can benchmark torch.distributed vs mpi4py . The reason why is we're essentially doing the same thing in both, and we already install torch. So if mpi4py doesn't bring substantial improvement let's remove it.

Right, I suspect performance should be similar, especially because there is not a lot of communication in these implementations. In my case, the MPI version is handy because it's easier and less error prone to launch MPI jobs on my system. I also know that interface a bit better and it offers a wider set of operations, so it's quicker to develop and test. I'm then circling back to add torch.distributed once I have a working algorithm. So far, I think we can do everything in torch.distributed that we need, i.e., the right primitives are there or they can be emulated (like scan).

  • can you add comments especially where you move the position on the FileObject, I think it's much clearer if I can really easily see where we are, "ie at the end of the size part' or something.

Otherwise great work like always! I'll be back on monday to review the rest of the PR!

Thanks for taking a look already @thomasw21 . Enjoy your holidays!

@adammoody
Copy link
Contributor Author

adammoody commented Aug 14, 2021

The functions I have in there right now are somewhat specific to the needs of the preprocess_dataset_mpi.py script, which assumes that each rank has one file. That was my first step, because it was most natural step based on what I needed.

It should be straight-forward to extend this to provide a new "merge files" function that takes a list of part files and an object defining the distributed environment. Because the final merge file has to created specially when one will be writing to it using multiple ranks, some of this would likely need to go into the class constructor. I'm thinking we could add that function as a method to the dataset class. So for example the main logic in merge_preprocessed_data.py might then change from:

    builder = indexed_dataset.make_builder(output_bin_file,
                                           impl=dataset_impl,
                                           dtype=dtype)
    for dataset in args.datasets:
        builder.merge_file_(dataset)
    builder.finalize(output_idx_file)

to:

    distctx = DataDist()
    builder = indexed_dataset.make_builder(output_bin_file,
                                           impl=dataset_impl,
                                           dtype=dtype,
                                           distctx=distctx)
    builder.merge_files_(args.datasets)
    builder.finalize(output_idx_file)

The DataDist object is a class that defines the distributed environment (mpi4py or torch.distributed) and it abstracts the necessary collective implementatons. This is passed to the indexed_dataset.make_builder function, which then enables output_bin_file to be created in a collective manner.

Then we drop the for loop and just pass the full list of files to the merge function. The ranks in distctx can then divide up that list and write the file using algorithms very close to what we already have.

Copy link
Member

@thomasw21 thomasw21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally finished the review, I know it's still WIP, but since I hadn't looked at the entire PR, I re-reviewed it. This looks incredible, in particular the performance gain we seem to obtain from this:

  • I'd advocate for the removal of mpi4py.
  • Please factorize some code, I'm sure there's a good abstraction allowing to re-use some of the existing code.
  • If multiprocessing implementation becomes tricky, feel free to put it away for now. We can always come back to it.
  • Can you share the scripts you're running to compare them? I'll try to run them this week in order to obtain a benchmark on our setup.

from mpi4py import MPI
self.MPI = MPI
except:
#print(f"ERROR: mpi4py requested, but failed to import, falling back to torch.distributed.", flush=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of raising an exception and stopping the script, but really this is a personal opinion.


class DistData(object):
def __init__(self, backend='gloo', use_mpi4py=False):
self.MPI = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing as torch.distributed can use MPI as its backend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed that variable to be clear

megatron/data/distdata.py Outdated Show resolved Hide resolved
import torch
import torch.distributed as dist

class DistData(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that's an awesome abstraction for multinode, especially in the case where we might want to support both torch.distributed and mpi4py. Though as I'm thinking about it, supporting mpi4py will become costly very soon, and might not bring much to the table as we might need to implement all improvements in two frameworks that essentially can use the same backend mpi. As you mentioned in a comment we should be able to do everything using torch.distributed, so let's remove mpi4py and come back to it the day torch.distributed isn't enough.

Sorry for my mistake, I thought it would be interesting to support both case, but I feel it ends up being a burden here where we need to create a higher level abstraction for not much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it would require some maintenance. I'll can yank out the MPI just before we merge the PR.

import torch.distributed as dist

class DistData(object):
def __init__(self, backend='gloo', use_mpi4py=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what's the difference in terms or perfomance between gloo and mpi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't measured this. There is not a ton of communication going on in this script, so it likely doesn't make much difference, especially since this will be bottlenecked by I/O performance.

Under the "Which backend to use?" section, they generally recommends that people stick with gloo, unless they really want mpi.
https://pytorch.org/docs/stable/distributed.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I did not know we had to build pytorch from source in order to get mpi working ...

Comment on lines 476 to 477
args.dist_merge = True
if args.dist_merge:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's odd ^^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a placeholder for a new --merge option. Just added that.

for key in args.columns:
filemain = get_filename(args, key)
filerank = get_filename(args, key, args.rank)
gather_files_dist(filemain, [filerank], args.distctx, dtype=best_fitting_dtype(args.vocab_size))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use merge_files_dist in order to have all the checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye that I'm not using merge_files_dist here. It's used by the merge_preprocessed_data.py script, which I've updated to use the distributed merge. I can add that in a future PR. If it helps, I can move that function to the other PR.

There is a subtle difference between merge_files_dist and gather_files_dist that I haven't explained well in the comments.

In merge_files_dist, all ranks must provide the full list of files to be merged, and it is assumed that any rank can read any of those files.

In the gather case, each process provides a subset of files that it will contribute. It also may be that only the calling process can access that subset of files. The gather call allows one to optionally store the per-rank file on node-local storage like /dev/shm or a node-local SSD, which might be only accessible to the calling process.

Comment on lines 491 to 496
# rename files for now and also do regular merge so we can time both and "cmp" them
if args.rank == 0:
binfile = data_file_path(filemain)
idxfile = index_file_path(filemain)
os.rename(binfile, binfile + ".par")
os.rename(idxfile, idxfile + ".par")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay maybe you can add a TODO so I won't miss it before merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is useful now with the --merge=both option, so I've updated it for that.

os.rename(binfile, binfile + ".par")
os.rename(idxfile, idxfile + ".par")

args.distctx.barrier()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all_sum already acts as a barrier no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true. I'll update the comment to note that it's also acting as a barrier (in case someone later decides to drop the timing/report bit).

startup_end = time.time()
if args.rank == 0:
print("Seconds to startup:", startup_end - startup_start)
print(f"Seconds to startup: {startup_end - startup_start}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

@adammoody
Copy link
Contributor Author

adammoody commented Aug 16, 2021

Thanks for the re-review, @thomasw21 . I'll try to get back to make these changes in the next day or so.

In the meantime, I think it should work for you if you want to try it on multiple nodes. I did also see that JZ has GPFS for several of its directories, so I think it should be fine. At least it seems that HOME, WORK, and SCRATCH are GPFS:

http://www.idris.fr/eng/jean-zay/cpu/jean-zay-cpu-calculateurs-disques-eng.html

This PR currently writes out two sets of files. It runs the parallel merge, renames those files to a .par extension, and then it runs the serial merge where rank 0 does all of the work using merge_file_. For an output file name "output", you'll get something like:

output.bin.par
output.idx.par
output.bin
output.idx

The cost for each merge is measured and reported separately so you can get the timing for each from one run. I then just use cmp to check that the corresponding files are identical:

cmp output.bin.par output.bin
cmp output.idx.par output.idx

This is all for testing. Longer term, we could perhaps just settle on the parallel merge. However, there might be a use case to still leave the serial merge in there. For example that might be a decent workaround for someone who does not have a file system like Lustre/GPFS. If so, we could add a --merge option so that one could choose between distributed or serial (or even allow --merge=both for testing).

BTW, the cmp does currently note a difference for the .idx files when using --dataset-impl cached, which is related to #66

@adammoody
Copy link
Contributor Author

Oh, and to run, I've been doing something like:

python -m torch.distributed.launch --nproc_per_node 2 ${topdir}/tools/preprocess_dataset_mpi.py \
       --input openwebtext \
       --count 10 \
       --output-prefix openwebtext-bert \
       --vocab bert-large-uncased-vocab.txt \
       --dataset-impl mmap \
       --tokenizer-type BertWordPieceLowerCase \
       --split-sentences

That --count 10 limits the number of samples to 10. That's helpful for testing. You can increase the number or just drop the --count option to do the full dataset.

I know you'd like to drop the mpi4py, but just in case you want to try that anyway, something like the following should work:

srun -n 80 -N 2 python3 ${topdir}/tools/preprocess_dataset_mpi.py \
       --input openwebtext \
       --count 10 \
       --mpi4py \
       --dataset-impl mmap \
       --output-prefix openwebtext-bert \
       --vocab bert-large-uncased-vocab.txt \
       --dataset-impl mmap \
       --tokenizer-type BertWordPieceLowerCase \
       --split-sentences

The main difference here is the addition of --mpi4py and using srun to launch.

You can toggle between --dataset-impl mmap and --dataset-impl cached to try each of the two file formats.

@adammoody
Copy link
Contributor Author

adammoody commented Aug 16, 2021

@thomasw21 , heads up that I just pushed a commit that changes the usage to add a --merge={parallel,serial,both} option. It defaults to parallel.

And I merged in main to pick up the cached dataset .idx fix.

@thomasw21
Copy link
Member

Let me know when it's up for review, I'll try spending some time on it later in the week.

@thomasw21
Copy link
Member

thomasw21 commented Aug 23, 2021

Quick update: I'm currently trying to run your multi node version, and I seem to have some issues:

Code

In test_preprocess_openwebtext_script.sh:

export MASTER_ADDR=$SLURM_SUBMIT_HOST # "$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n1)"
export MASTER_PORT=12345
export DISTRIBUTED_ARGS=" \
        --nproc_per_node 40 \
        --nnodes $SLURM_JOB_NUM_NODES \
        --node_rank $SLURM_NODEID \
        --master_addr $MASTER_ADDR \
        --master_port $MASTER_PORT \
"

PREPROCESSED_DATASET_PREFIX=$SCRATCH/data/processed/openwebtext-gpt2
export PROCESSING_ARGS=" \
       --input stas/openwebtext-10k \
       --count 10000 \
       --output-prefix $PREPROCESSED_DATASET_PREFIX \
       --merge both \
       --dataset-impl mmap \
       --tokenizer-type PretrainedFromHF \
       --tokenizer-name-or-path gpt2 \
       --seed 101
"

time python -m torch.distributed.launch $DISTRIBUTED_ARGS  Megatron-DeepSpeed-Adam/tools/preprocess_data_dist.py $PROCESSING_ARGS

and then call this via srun sh ./test_preprocess_openwebtext_script.sh.

Issues

I'm getting the following when running on 4/8 nodes (works correctly for 1/2 nodes):

ValueError: ProcessGroupGloo::scatter: invalid tensor size at index 80 (expected (32), got (31))

I'm not sure why this happens? My small guess is for X reason, a process is awaiting something of size 32 and got 31. This sounds a lot like the remainder of indices. The reason why things would work for 1/2 is that 10k / (2 * 40) = 125 (with 2 nodes of 40 cpus), whereas 10k / (4 * 40) = 62.5 and 10k / ( 8 * 40 ) = 31.25 (which corresponds to the error we get up there). Now the question is whether gloo actually handles tensors of different size in scatter? If not we might need to pad values and ignore them ... it's a shame everything worked correctly using mpi4py and not torch.distributed ...

Maybe I'm missing something? I'll look more in depth tmr. I might have gotten the script very wrong as I'm not used to using torch.distributed

@adammoody
Copy link
Contributor Author

Oh, shoot. Yes, I can reproduce that. Apparently, I hadn't tested that configuration. I guess it's a true scatter rather than scatterv in torch.distributed. I'll work on a fix.

@adammoody
Copy link
Contributor Author

@thomasw21 , I just pushed a commit that seems to fix it for me. Would you please try again when you get a chance?

# Receive a tensor of the max count size from the root,
# then copy values into output numpy array, which may be smaller.
recvtensor = torch.zeros(max(counts), dtype=torch.int64)
dist.scatter(recvtensor, scatterlist, src=root)
Copy link
Member

@thomasw21 thomasw21 Aug 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about scatter_object_list? I tried it and it seems to work nicely, though I don't know what's the cost using that method vs padding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting. So I seem to be using an old enough torch.distributed that I don't have scatter_object_list:

AttributeError: module 'torch.distributed' has no attribute 'scatter_object_list'

It looks like it was added about 9 months ago in this commit:

pytorch/pytorch@02d89f9

I can also see that it internally is just calling broadcast and scatter a couple of times.

After seeing that, I think our pad method is probably the best way to go after all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah let's go with padding.

Comment on lines 83 to 86
for num, s in zip(counts, slices):
padtensor = torch.zeros(max(counts), dtype=torch.int64)
padtensor[:num] = s
scatterlist.append(padtensor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If scatter_object_list doesn't work for you, maybe we can improve slightly.

import torch.nn.functional as F

slices = torch.split(torch.from_numpy(invals), counts)
max_size = max(counts)
scatterlist = [F.pad(slice, (0, max_size - len(slice))) for slice in slices]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's cleaner. Thanks for the tip.

I did that just to try it and pushed a commit in case you want to use it. I can also try scatter_object_list. I suspect that should also work.

I suppose the tensor-based method could be more efficient communication-wise (no pickle step), though this scatter step will not take much time in either case compared to the total time of the script.

The bigger concern might be the memory required on rank 0 to put together the arguments for the scatter. With the mpi4py Scatterv, I know mpi4py sends data from the original numpy array. With torch, it looks like we'll at least be doubling the memory by effectively slicing up the original numpy list into these per-rank tensors/sublists. I don't know if it would be worse the doubling the memory -- it depends on the implementation under the covers. However, even that likely won't be an issue until the input index list is really big, at which point, we can always fall back to the file-based scatter.

Whichever you prefer is good with me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the day we can't fit everything on a single process, we'll think of a better way IMO. (perhaps bring backmpi4py). Let's stick to the padding strategy.

# then copy values into output numpy array, which may be smaller.
recvtensor = torch.zeros(max(counts), dtype=torch.int64)
dist.scatter(recvtensor, scatterlist, src=root)
outval[:] = recvtensor[:counts[self.rank]]
Copy link
Member

@thomasw21 thomasw21 Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't benefit from the inplace operator, let's return the tensor instead of doing an inplace operation? Typically you could return only recvtensor[:counts[self.rank]] which would remove the need to initialise outval.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Made that change.

Copy link
Member

@thomasw21 thomasw21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall that looks good to me. However when running:

  • idx size seem to differ, probably an offset that's wrong for X reason.
> cmp -b openwebtext-gpt2_text_document.idx openwebtext-gpt2_text_document.idx.par 
openwebtext-gpt2_text_document.idx openwebtext-gpt2_text_document.idx.par differ: byte 30115, line 118 is  12 ^J   0 ^@

You seem to have already integrated your fix in this branch, I suspect something about a wrong offset.

  • I see no performance improvement using 4 nodes, in fact I see significant loss in performance.
config nodes cpus_per_node serial parallel
stas/openwebtext-10k 1 40 0.06870174407958984s (314.938 MB/s) 0.18674302101135254s (115.864 MB/s)
stas/openwebtext-10k 4 40 0.5817015171051025s (37.204 MB/s) 0.6286804676055908s (34.424 MB/s)

I see potentially two issues:

  • stas/openwebtext-10k might be just too small. Going to try on openwebtext.
  • That might be linked to our setup where we're not using SSD. Thought that wouldn't really explain the massive slow down when increasing the number of nodes. Will investigate.

def scatterv_(self, invals, counts, outval, root=0):
"""Scatter int64 values from invals according to counts array, receive values in outval"""

self.allassert(len(counts) == self.numranks,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put that in future PR. For me the PR introduces the feature, we could always improve on it later on.

@adammoody
Copy link
Contributor Author

@thomasw21 , thanks for running those tests.

Regarding the index cmp check, are you writing files in mmap or cached format?

How many items are you selecting from openwebtext? I'm trying to get a handle on what section of the file that might be.

Are the output files being written to one of the GPFS directories?

@thomasw21
Copy link
Member

mmap, all items, and directory should be safe.

@adammoody
Copy link
Contributor Author

I'm a bit stumped on the cmp mismatch. In the mmap index file format, there is a 34 byte header. Using all samples from stas/openwebtext-10k, byte offset 30115 should then fall within the list of 32-bit size values. I was hoping that might line up close to a rank boundary, but I'm not seeing anything yet in the various run configurations that I've tried.

In that case, do you know how many total ranks were you using?

@thomasw21
Copy link
Member

Sorry for the short reply earlier, I was away from my computer. I re-ran it, it seems to be fine .... so I don't know what was wrong. Let's ignore it for now as it was likely an error from my end.

However I am seeing a drastic slowdown currently (I re-ran stas/openwebtext and it reproduced the number I mentioned earlier). Still waiting to test out on openwebtext (I'm having issues about a version, getting datasets.utils.info_utils.NonMatchingSplitsSizesError currently).

FYI I'll go on holidays starting friday, so I might merge this seeing as I'm confident it works (just not confident it runs any faster on our setup). I'll benchmark everything upon coming back or let @stas00 try to run the scripts.

@adammoody
Copy link
Contributor Author

Ok, good. I was focused on finding the cause of the correctness problem first, but it sounds like that might have been a false alarm. Please let me know if that shows up again.

Regarding performance, yes, the numbers you are seeing are quite bad and scaling poorly.

If I try the same configuration with stas/openwebtext-10k, with 40 procs per node using 1 node and 4 nodes, I'm getting something close, though my 1-node results seem to be flipped so that parallel is faster there.

nodes/ppn Serial Merge Parallel Merge
1/40 106.266 MB/s 237.300 MB/s
4/40 18.279 MB/s 14.374 MB/s

On file systems that allow parallel writes to a file, the cost can be high if processes writes to sections that are too close to each other. The file system servers often implement some form of byte range locking to ensure that only one client is writing to a given region at a time, and I've seen cases where it's easy to thrash those lock servers, especially in false sharing scenarios. Since this dataset is relatively small, the processes will be slicing up the file pretty finely.

How do things fare for a larger dataset?

As a next step up, maybe try openwebtext with a --count 100000?

@adammoody
Copy link
Contributor Author

adammoody commented Aug 26, 2021

Also, heads up that things can seem to get stuck after this message for a while:

Waiting for ranks to finalize files ...

Rank 0 prints this message when it finishes its portion of the work, and then it waits for everyone in a barrier. Rank 0 can finish much earlier than the full set of procs due to load imbalance across the processes. With that barrier, everyone has to wait on the slowest process, and that's probably the one that ended up with the most amount of text to process. I have seen a factor of 2 in much of my testing. Things are better if the data is shuffled.

I have some ideas to improve load balance in future work if that becomes an issue.

@thomasw21
Copy link
Member

thomasw21 commented Aug 26, 2021

Can't seem to benchmark on openwebtext. I'll create an issue directly on datasets (huggingface/datasets#2839).

datasets.utils.info_utils.NonMatchingSplitsSizesError: [{'expected': SplitInfo(name='train', num_bytes=39769494896, num_examples=8013769, dataset_name='openwebtext'), 'recorded': SplitInfo(name='train', num_bytes=39611023912, num_examples=7982430, dataset_name='openwebtext')}]

@thomasw21
Copy link
Member

thomasw21 commented Aug 26, 2021

Okay let's merge this, it's safe to say that it works, and doesn't break other scripts. This will allow us to look at the jsonl version of that script. A few things I think we might want to do:

  • use torch instead of numpy (I'm pretty sure everything we want to do can be done via torch).
  • update readme to display the option of using this distributed method, probably for very large datasets.
  • improve logging to have live logs. Typically on large files having no constant logging might suggest deadlocks and such.
  • though it was a great things to have the "both" option, I think it makes no sense since we never want to compute "both" unless in testing. And both can just be computed by running the script twice with the two different options.
  • improve logging. When running, it sometimes crashed (due to incorrect setup from my end), but reading the stacktrace to understand what happened was nightmare-ish because all processes were logging at the same place. I don't know if there's a way for stack trace to be written by block. Maybe a better setup would be for me to have each process write in seperate files ...
  • We should expose subset for datasets. Typically you can't really run load_dataset("oscar") you have to choose a subset load_dataset("oscar", "en") for example.
  • To be explored, but usually users will use datasets to modify orignal dataset via map/filter. It might be worth to start looking into creating a preprocess_meg_dist(datasets) method or something like that. (let's discuss first about this).
  • The notion of parallel merge could be extended to other scripts. Essentially the other scripts are single node version, using multiprocessing. I'm pretty sure we can get a similar parallel merge. Maybe DistData could be a good abstraction for multiprocessing also.

I'll create issues when I come back, but if you're interested in any of them/or want to challenge any ideas feel free to ping me!
I'll still run benchmarks when I come back if the openwebtext becomes fixed this week.

and also .... AWESOME WORK!!! Thanks again for the massive contribution! If you have any interest in bigscience, feel free to join the slack. cc @stas00

@thomasw21 thomasw21 merged commit 9722111 into bigscience-workshop:main Aug 26, 2021
@stas00
Copy link
Contributor

stas00 commented Aug 26, 2021

The test suite can't handle this addition. Why was offline mode enabled? How can it get the required files then?

and the dataset is incorrect, should be "--input stas/openwebtext-10k"

tests/test_preprocessing.py::MegDSTestPreprocessing.test_preprocess_data ✓                                                                        25% ██▌       
Running:  python -m torch.distributed.launch --nproc_per_node 2 /home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py --input openwebtext-10k --count 1000 --output-prefix /tmp/tmpv0p2agsk/test-ds-meg-gpt2-openwebtext_1k --dataset-impl mmap --tokenizer-type GPT2BPETokenizer --merge-file /home/ubuntu/code/Megatron-DeepSpeed/tests/data/gpt2/gpt2-tiny-merges.txt --vocab /home/ubuntu/code/Megatron-DeepSpeed/tests/data/gpt2/gpt2-tiny-vocab.json --append-eod
stderr: /home/ubuntu/.local/lib/python3.8/site-packages/torch/distributed/launch.py:163: DeprecationWarning: The 'warn' method is deprecated, use 'warning' instead
stderr:   logger.warn(
stderr: The module torch.distributed.launch is deprecated and going to be removed in future.Migrate to torch.distributed.run
stderr: WARNING:torch.distributed.run:--use_env is deprecated and will be removed in future releases.
stderr:  Please read local_rank from `os.environ('LOCAL_RANK')` instead.
stderr: INFO:torch.distributed.launcher.api:Starting elastic_operator with launch configs:
stderr:   entrypoint       : /home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py
stderr:   min_nodes        : 1
stderr:   max_nodes        : 1
stderr:   nproc_per_node   : 2
stderr:   run_id           : none
stderr:   rdzv_backend     : static
stderr:   rdzv_endpoint    : 127.0.0.1:29500
stderr:   rdzv_configs     : {'rank': 0, 'timeout': 900}
stderr:   max_restarts     : 3
stderr:   monitor_interval : 5
stderr:   log_dir          : None
stderr:   metrics_cfg      : {}
stderr: 
stderr: INFO:torch.distributed.elastic.agent.server.local_elastic_agent:log directory set to: /tmp/torchelastic_t_gc0h3b/none_bfjtb6a_
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] starting workers for entrypoint: python
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Rendezvous'ing worker group
stderr: /home/ubuntu/.local/lib/python3.8/site-packages/torch/distributed/elastic/utils/store.py:52: FutureWarning: This is an experimental API and will be changed in future.
stderr:   warnings.warn(
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Rendezvous complete for workers. Result:
stderr:   restart_count=0
stderr:   master_addr=127.0.0.1
stderr:   master_port=29500
stderr:   group_rank=0
stderr:   group_world_size=1
stderr:   local_ranks=[0, 1]
stderr:   role_ranks=[0, 1]
stderr:   global_ranks=[0, 1]
stderr:   role_world_sizes=[2, 2]
stderr:   global_world_sizes=[2, 2]
stderr: 
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Starting worker group
stderr: INFO:torch.distributed.elastic.multiprocessing:Setting worker0 reply file to: /tmp/torchelastic_t_gc0h3b/none_bfjtb6a_/attempt_0/0/error.json
stderr: INFO:torch.distributed.elastic.multiprocessing:Setting worker1 reply file to: /tmp/torchelastic_t_gc0h3b/none_bfjtb6a_/attempt_0/1/error.json
stdout: 2021-08-26T21:20:50: Opening dataset openwebtext-10k
stdout: ERROR: Cannot download 'openwebtext-10k' since running in offline mode.
stdout: ERROR: If the dataset is large, it may be more efficient to download with a single process:
stdout: ERROR:     from datasets import load_dataset
stdout: ERROR:     dset = load_dataset('openwebtext-10k')
stdout: ERROR: Alternatively, one can force this script to download by setting $HF_DATASETS_OFFLINE=0
stderr: Traceback (most recent call last):
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 639, in <module>
stderr:     main()
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 589, in main
stderr:     dset = load_dset(args)
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 266, in load_dset
stderr:     args.distctx.allraise_if(err)
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/megatron/data/distdata.py", line 53, in allraise_if
stderr:     raise DistDataError
stderr: megatron.data.distdata.DistDataError
stderr: Traceback (most recent call last):
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 639, in <module>
stderr:     main()
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 589, in main
stderr:     dset = load_dset(args)
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 266, in load_dset
stderr:     args.distctx.allraise_if(err)
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/megatron/data/distdata.py", line 48, in allraise_if
stderr:     raise err
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 253, in load_dset
stderr:     dset = load_dataset(dsetname, split=args.split, keep_in_memory=None)
stderr:   File "/home/ubuntu/.local/lib/python3.8/site-packages/datasets/load.py", line 819, in load_dataset
stderr:     builder_instance = load_dataset_builder(
stderr:   File "/home/ubuntu/.local/lib/python3.8/site-packages/datasets/load.py", line 681, in load_dataset_builder
stderr:     module_path, hash, resolved_file_path = prepare_module(
stderr:   File "/home/ubuntu/.local/lib/python3.8/site-packages/datasets/load.py", line 330, in prepare_module
stderr:     local_path = cached_path(file_path, download_config=download_config)
stderr:   File "/home/ubuntu/.local/lib/python3.8/site-packages/datasets/utils/file_utils.py", line 288, in cached_path
stderr:     output_path = get_from_cache(
stderr:   File "/home/ubuntu/.local/lib/python3.8/site-packages/datasets/utils/file_utils.py", line 604, in get_from_cache
stderr:     _raise_if_offline_mode_is_enabled(f"Tried to reach {url}")
stderr:   File "/home/ubuntu/.local/lib/python3.8/site-packages/datasets/utils/file_utils.py", line 362, in _raise_if_offline_mode_is_enabled
stderr:     raise OfflineModeIsEnabled(
stderr: datasets.utils.file_utils.OfflineModeIsEnabled: Offline mode is enabled. Tried to reach https://raw.githubusercontent.com/huggingface/datasets/1.11.0/datasets/openwebtext-10k/openwebtext-10k.py
stderr: ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 0 (pid: 1932) of binary: /usr/bin/python
stderr: ERROR:torch.distributed.elastic.agent.server.local_elastic_agent:[default] Worker group failed
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Worker group FAILED. 3/3 attempts left; will restart worker group
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Stopping worker group
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Rendezvous'ing worker group
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Rendezvous complete for workers. Result:
stderr:   restart_count=1
stderr:   master_addr=127.0.0.1
stderr:   master_port=29500
stderr:   group_rank=0
stderr:   group_world_size=1
stderr:   local_ranks=[0, 1]
stderr:   role_ranks=[0, 1]
stderr:   global_ranks=[0, 1]
stderr:   role_world_sizes=[2, 2]
stderr:   global_world_sizes=[2, 2]
stderr: 
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Starting worker group
stderr: INFO:torch.distributed.elastic.multiprocessing:Setting worker0 reply file to: /tmp/torchelastic_t_gc0h3b/none_bfjtb6a_/attempt_1/0/error.json
stderr: INFO:torch.distributed.elastic.multiprocessing:Setting worker1 reply file to: /tmp/torchelastic_t_gc0h3b/none_bfjtb6a_/attempt_1/1/error.json
stderr: Traceback (most recent call last):
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 639, in <module>
stderr:     main()
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 585, in main
stderr:     args = get_args()
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/tools/preprocess_data_dist.py", line 194, in get_args
stderr:     args.distctx = DistData(backend=args.torch_backend)
stderr:   File "/home/ubuntu/code/Megatron-DeepSpeed/megatron/data/distdata.py", line 16, in __init__
stderr:     dist.init_process_group(backend, init_method="env://")
stderr:   File "/home/ubuntu/.local/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 523, in init_process_group
stderr:     default_pg = _new_process_group_helper(
stderr:   File "/home/ubuntu/.local/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 616, in _new_process_group_helper
stderr:     pg = ProcessGroupGloo(
stderr: RuntimeError: [/pytorch/third_party/gloo/gloo/transport/tcp/pair.cc:799] connect [172.31.30.41]:32170: Connection refused
stderr: ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 1 (pid: 1959) of binary: /usr/bin/python
stderr: ERROR:torch.distributed.elastic.agent.server.local_elastic_agent:[default] Worker group failed
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Worker group FAILED. 2/3 attempts left; will restart worker group
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Stopping worker group
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Rendezvous'ing worker group
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Rendezvous complete for workers. Result:
stderr:   restart_count=2
stderr:   master_addr=127.0.0.1
stderr:   master_port=29500
stderr:   group_rank=0
stderr:   group_world_size=1
stderr:   local_ranks=[0, 1]
stderr:   role_ranks=[0, 1]
stderr:   global_ranks=[0, 1]
stderr:   role_world_sizes=[2, 2]
stderr:   global_world_sizes=[2, 2]
stderr:
stderr: INFO:torch.distributed.elastic.agent.server.api:[default] Starting worker group
stderr: INFO:torch.distributed.elastic.multiprocessing:Setting worker0 reply file to: /tmp/torchelastic_t_gc0h3b/none_bfjtb6a_/attempt_2/0/error.json
stderr: INFO:torch.distributed.elastic.multiprocessing:Setting worker1 reply file to: /tmp/torchelastic_t_gc0h3b/none_bfjtb6a_/attempt_2/1/error.json
gr ^C

@adammoody
Copy link
Contributor Author

Thanks for all of your help getting this into shape, @thomasw21 ! Enjoy your holidays!

@adammoody adammoody deleted the pmerge branch August 27, 2021 00:50
adammoody pushed a commit to adammoody/Megatron-DeepSpeed that referenced this pull request Jun 21, 2023
* xpu support (bigscience-workshop#55)

* port accel abs interfece

* WA for run3.6b

* move on

* fix current_dievice

* fix typo

* enable to run 345M GPT

* delete apex_patch

* add TODO xpu compatible tg for xpu WA

* use deepspeed launcher

* enable run3.6b bf16

* add zero2 config json

* readd enable_each_rank_log

* fix typos

* add ccl arg

* fix

* use short word

* use no-masked-softmax-fusion

* readd

* set train  iters to 10

* remove duplicate line

* change assert msg

* update format

* add whitespace

* update path

* update note

* update

* fix typos

* delete notes

* update format

* update xpu check to cuda check

* update

* clean up file

* fix typos

* add python based gradient clipping

* change condition for python based path
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants