Skip to content

Commit

Permalink
Convert DHT to libp2p backend (#296)
Browse files Browse the repository at this point in the history
This PR changes DHT to operate over the p2p daemon (instead of gRPC) using libp2p PeerIDs and Multiaddrs (instead of raw IP:port endpoints).

Co-authored-by: Ilya Kobelev <[email protected]>
  • Loading branch information
borzunov and skobellev authored Jul 10, 2021
1 parent 21fda75 commit 0be1512
Show file tree
Hide file tree
Showing 35 changed files with 1,056 additions and 796 deletions.
8 changes: 4 additions & 4 deletions benchmarks/benchmark_averaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def sample_tensors(hid_size, num_layers):
def benchmark_averaging(num_peers: int, target_group_size: int, num_rounds: int,
averaging_expiration: float, request_timeout: float, round_timeout: float,
hid_size: int, num_layers: int, spawn_dtime: float):
dht_root = hivemind.DHT(listen_on=f'{LOCALHOST}:*', start=True)
dht_root = hivemind.DHT(start=True)
initial_peers = dht_root.get_visible_maddrs()

num_groups = 2 ** int(round(math.log2(num_peers / target_group_size)))
nbits = int(round(math.log2(num_groups)))
peer_tensors = [sample_tensors(hid_size, num_layers)
Expand All @@ -45,9 +47,7 @@ def benchmark_averaging(num_peers: int, target_group_size: int, num_rounds: int,

def run_averager(index):
nonlocal successful_steps, total_steps, lock_stats
dht = hivemind.DHT(listen_on=f'{LOCALHOST}:*',
initial_peers=[f"{LOCALHOST}:{dht_root.port}"],
start=True)
dht = hivemind.DHT(initial_peers=initial_peers, start=True)
initial_bits = bin(index % num_groups)[2:].rjust(nbits, '0')
averager = hivemind.averaging.DecentralizedAverager(
peer_tensors[i], dht, prefix='my_tensor', initial_group_bits=initial_bits, listen_on=f"{LOCALHOST}:*",
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/benchmark_dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def benchmark_dht(num_peers: int, initial_peers: int, num_experts: int, expert_b
logger.info("Creating peers...")
peers = []
for _ in trange(num_peers):
neighbors = [f'0.0.0.0:{node.port}' for node in random.sample(peers, min(initial_peers, len(peers)))]
peer = hivemind.DHT(initial_peers=neighbors, start=True, wait_timeout=wait_timeout,
listen_on=f'0.0.0.0:*')
neighbors = sum([peer.get_visible_maddrs()
for peer in random.sample(peers, min(initial_peers, len(peers)))], [])
peer = hivemind.DHT(initial_peers=neighbors, start=True, wait_timeout=wait_timeout)
peers.append(peer)

store_peer, get_peer = peers[-2:]
Expand Down
58 changes: 40 additions & 18 deletions examples/albert/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ This tutorial will walk you through the steps to set up collaborative training w
## Running an experiment
- Run the first DHT peer to welcome trainers and record training statistics (e.g. loss, performance):
- In this example, we use [wandb.ai](https://wandb.ai/site) to plot training metrics; If you're unfamiliar with Weights & Biases, here's a [quickstart tutorial](https://docs.wandb.ai/quickstart).
- Run `python run_training_monitor.py --dht_listen_on '[::]:*' --experiment_prefix NAME_YOUR_EXPERIMENT --wandb_project WANDB_PROJECT_HERE`
- Run `python run_training_monitor.py --experiment_prefix NAME_YOUR_EXPERIMENT --wandb_project WANDB_PROJECT_HERE`
- `NAME_YOUR_EXPERIMENT` must be a unique name of this training run, e.g. `my-first-albert`. It cannot contain `.` due to naming conventions.
- `WANDB_PROJECT_HERE` is a name of wandb project used to track training metrics. Multiple experiments can have the same project name.
- This peer will run a DHT node on a certain IP/port (`Running DHT root at ...`). You will need this address for next steps
```
+ python run_training_monitor.py --dht_listen_on '[::]:*' --experiment_prefix my-albert-v1 --wandb_project Demo-run
[2021/06/17 16:26:35.931][WARN][root.<module>:140] No address specified. Attempting to infer address from DNS.
[2021/06/17 16:26:36.083][INFO][root.<module>:149] Running DHT root at 193.106.95.184:38319
$ python run_training_monitor.py --experiment_prefix my-albert-v1 --wandb_project Demo-run
[2021/06/17 16:26:36.083][INFO][root.log_visible_maddrs:42] Running a DHT peer. To connect other peers to this one, use --initial_peers /ip4/8.8.8.8/tcp/1337/p2p/XXXX /ip4/8.8.8.8/udp/31337/quic/p2p/XXXX
wandb: Currently logged in as: XXX (use `wandb login --relogin` to force relogin)
wandb: Tracking run with wandb version 0.10.32
wandb: Syncing run dry-mountain-2
Expand All @@ -30,21 +28,37 @@ wandb: Run `wandb offline` to turn off syncing.
[2021/04/19 02:26:41.064][INFO][optim.collaborative.fetch_collaboration_state:323] Found no active peers: None
[2021/04/19 02:26:44.068][INFO][optim.collaborative.fetch_collaboration_state:323] Found no active peers: None
...
[2021/04/19 02:37:37.246][INFO][root.<module>:74] 11.05164
[2021/04/19 02:39:37.441][INFO][root.<module>:74] 11.03771
[2021/04/19 02:40:37.541][INFO][root.<module>:74] 11.02886
[2021/04/19 02:37:37.246][INFO][__main__.<module>:194] Step #1 loss = 11.05164
[2021/04/19 02:39:37.441][INFO][__main__.<module>:194] Step #2 loss = 11.03771
[2021/04/19 02:40:37.541][INFO][__main__.<module>:194] Step #3 loss = 11.02886
```

- To join a collaboration with a GPU trainer,
- To join a collaboration with a GPU trainer,
- install the same dependencies (minus the `wandb` and `whatsmyip`), download the data and unpack it to the experiment folder,
- if necessary, specify paths: `--dataset_path ./path/to/unpacked/data --tokenizer ./path/to/tokenizer/config` (see [default paths](https://github.com/learning-at-home/hivemind/blob/collaborative_albert_example/examples/albert/run_trainer.py#L63-L69) for reference)
- run:
```shell
python run_trainer.py \
--experiment_prefix SAME_AS_IN_RUN_TRAINING_MONITOR --initial_peers ONE_OR_MORE_PEERS --seed 42 \
--logging_first_step --logging_steps 100 --output_dir ./outputs --overwrite_output_dir --logging_dir ./logs
```
Here, `ONE_OR_MORE_PEERS` stands for either your coordinator endpoint (e.g. `123.123.123.123:1337`), an endpoint of any pre-existing trainer or multiple endpoints for stability. See tips & tricks section below for more information on setting up collaborative training.
```bash
python run_trainer.py \
--experiment_prefix SAME_AS_IN_RUN_TRAINING_MONITOR --initial_peers ONE_OR_MORE_PEERS --seed 42 \
--logging_first_step --logging_steps 100 --output_dir ./outputs --overwrite_output_dir --logging_dir ./logs
```

Here, `ONE_OR_MORE_PEERS` stands for multiaddresses of one or multiple existing peers (training monitors or existing trainers)
collected from the first lines of their terminal output. For the example above, the multiaddresses would be:
```
--initial_peers /ip4/8.8.8.8/tcp/1337/p2p/XXXX /ip4/8.8.8.8/udp/31337/quic/p2p/XXXX
```

__Note:__ a [multiaddress](https://docs.libp2p.io/concepts/addressing/) is a format for encoding multiple layers of addressing information
that supports a number of different protocols. In hivemind, we typically operate with multiaddresses
that contain a [libp2p](https://libp2p.io/) peer ID (e.g. `/p2p/XXXX`) together with the information about how to reach it
(e.g. the IPv4 address and TCP port `/ip4/8.8.8.8/tcp/31337` or
the information about a relay used for [NAT traversal](https://docs.libp2p.io/concepts/nat/)).

You may need to change the IP address to a publicly visible one if some of the initial peers are located behind NAT.
If you have any trouble doing this, consider the ["Using IPFS"](#using-ipfs) section.

See the ["Tips and tricks"](#tips-and-tricks) section for more information on setting up collaborative training.

As the peer begins training, it will periodically report training logs in the following form:
```
Expand All @@ -61,7 +75,10 @@ As the peer begins training, it will periodically report training logs in the fo
__Sanity check:__ a healthy peer will periodically report `Averaged tensors successfully with [N > 1]` peers.

For convenience, you can view (and share!) the learning curves of your collaborative experiments in wandb:
![image](https://user-images.githubusercontent.com/3491902/115177859-bed5e100-a0d8-11eb-82bc-55d1b12d335d.png)

<p align="center">
<img src="https://user-images.githubusercontent.com/3491902/115177859-bed5e100-a0d8-11eb-82bc-55d1b12d335d.png">
</p>


## Tips and tricks
Expand All @@ -70,7 +87,7 @@ Finally, we provide best practices for running collaborative experiments of diff

### Hosting the data
For small experiments (3-16 peers, <1GB data), you can use a free-tier file hosting that has a convenient way to [download with curl/wget](https://superuser.com/questions/470664/how-to-download-dropbox-files-using-wget-command). However, these services are not meant for high load and could ban you for generating too much traffic. If you want to scale up, you could either use an S3-like storage from [any](https://aws.amazon.com/s3/) [cloud](https://cloud.google.com/storage) [provider](https://cloud.google.com/storage) or host the data [yourself]((https://gist.github.com/willurd/5720255)). Large data files (>5GB) will take long to download; we recommend splitting them into chunks and implementing a custom dataloader that can load chunks on the fly. Finally, the most _comme il faut_ solution to sharing large datasets is to use [academic torrents](https://academictorrents.com/).

### run_training_monitor.py
This peer exists solely to welcome other peers onto the DHT and track learning progress. It requires neither GPU nor high bandwidth, the only prerequisite is that coordinator should have high uptime. If no high uptime server is available, one can also run multiple coordinators on different servers and list all of them as `--initial_peers`. The system will stay up as long as at least one coordinator is available. For short- to mid-term experiments you can host coordinator on a [free-tier VM](https://www.quora.com/Are-there-any-free-online-virtual-machines).

Expand All @@ -84,7 +101,7 @@ There are awesome services like [Google Colab](https://colab.research.google.com
- you can create starter notebooks to make it more convenient for collaborators to join your training run ([example](https://colab.research.google.com/gist/yhn112/e858cb841c73879d8ef98a84e03b43e7/collaborative-training-v0-10.ipynb)). Ideally, joining collaboration should take at most a couple of clicks.
Here's an example of a full trainer script for Google Colab:
```
```bash
!pip install transformers datasets sentencepiece torch_optimizer==0.1.0
!git clone https://github.com/learning-at-home/hivemind && cd hivemind && pip install -e .
!curl -L YOUR_HOSTED_DATA | tar xzf - # example: https://hivemind-data.s3.us-east-2.amazonaws.com/wikitext103.tar.gz
Expand All @@ -94,3 +111,8 @@ Here's an example of a full trainer script for Google Colab:
--logging_first_step --logging_steps 100 --output_dir ./outputs --overwrite_output_dir --logging_dir ./logs \
--experiment_prefix EXPERIMENT_NAME_HERE --seed 42
```

### Using IPFS
If the initial peers for your experiment are located behind NAT and/or you have any trouble with figuring out their public IP addresses and ports, you can set up hivemind to use the [IPFS](https://ipfs.io) network to find the route to your peers automatically. To do this, you should specify the `--use_ipfs` option on all peers (both training monitors and trainers) you are starting.

After that, it is enough to provide only a [libp2p](https://libp2p.io/) peer ID (e.g. `/p2p/XXXX`) for each initial peer. No other information (like IP addresses or TCP/UDP ports) is required.
29 changes: 20 additions & 9 deletions examples/albert/arguments.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Optional, List
from dataclasses import dataclass, field
from typing import Optional, List

from transformers import TrainingArguments

Expand All @@ -11,11 +11,26 @@ class BaseTrainingArguments:
)
initial_peers: List[str] = field(
default_factory=list,
metadata={"help": "One or more peers (comma-separated) that will welcome you into the collaboration"}
metadata={"help":
"Multiaddrs of the peers that will welcome you into the existing collaboration. "
"Example: /ip4/203.0.113.1/tcp/31337/p2p/XXXX /ip4/203.0.113.2/udp/7777/quic/p2p/YYYY"}
)
dht_listen_on: str = field(
default="[::]:*",
metadata={"help": "Network interface used for incoming DHT communication. Default: all ipv6"}
use_ipfs: bool = field(
default=False,
metadata={"help":
"Use IPFS to find initial_peers. If enabled, you only need to provide /p2p/XXXX part of the multiaddrs "
"for the initial_peers (no need to specify a particular IPv4/IPv6 host and port)"}
)
host_maddrs: List[str] = field(
default_factory=lambda: ['/ip4/0.0.0.0/tcp/0', '/ip4/0.0.0.0/udp/0/quic'],
metadata={"help":
"Multiaddrs to listen for external connections from other p2p instances. "
"Defaults to all IPv4 interfaces with TCP and QUIC (over UDP) protocols: "
"/ip4/0.0.0.0/tcp/0 /ip4/0.0.0.0/udp/0/quic"}
)
announce_maddrs: List[str] = field(
default_factory=list,
metadata={"help": "Visible multiaddrs the host announces for external connections from other p2p instances"}
)


Expand Down Expand Up @@ -97,10 +112,6 @@ class CollaborationArguments(AveragerArguments, CollaborativeOptimizerArguments,
default=600,
metadata={"help": "Statistics will be removed if not updated in this many seconds"}
)
endpoint: Optional[str] = field(
default=None,
metadata={"help": "This node's IP for inbound connections, used when running from behind a proxy"}
)


@dataclass
Expand Down
19 changes: 11 additions & 8 deletions examples/albert/run_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from torch_optimizer import Lamb

import hivemind
import utils
from arguments import CollaborationArguments, DatasetArguments, AlbertTrainingArguments
import metrics_utils


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -130,7 +130,7 @@ def on_step_end(self, args: TrainingArguments, state: transformers.TrainerState,
self.last_reported_collaboration_step = self.collaborative_optimizer.local_step
self.total_samples_processed += self.samples
samples_per_second = self.collaborative_optimizer.performance_ema.samples_per_second
statistics = metrics_utils.LocalMetrics(
statistics = utils.LocalMetrics(
step=self.collaborative_optimizer.local_step,
samples_per_second=samples_per_second,
samples_accumulated=self.samples,
Expand Down Expand Up @@ -219,13 +219,16 @@ def main():

opt, scheduler = get_optimizer_and_scheduler(training_args, model)

validators, local_public_key = metrics_utils.make_validators(
validators, local_public_key = utils.make_validators(
collaboration_args_dict['experiment_prefix'])
dht = hivemind.DHT(
start=True, initial_peers=collaboration_args_dict.pop('initial_peers'),
listen=not collaboration_args_dict['client_mode'],
listen_on=collaboration_args_dict.pop('dht_listen_on'),
endpoint=collaboration_args_dict.pop('endpoint'), record_validators=validators)
dht = hivemind.DHT(start=True,
initial_peers=collaboration_args_dict.pop('initial_peers'),
listen=not collaboration_args_dict['client_mode'],
record_validators=validators,
use_ipfs=collaboration_args_dict['use_ipfs'],
host_maddrs=collaboration_args_dict.pop('host_maddrs'),
announce_maddrs=collaboration_args_dict.pop('announce_maddrs'))
utils.log_visible_maddrs(dht.get_visible_maddrs(), only_p2p=collaboration_args_dict.pop('use_ipfs'))

total_batch_size_per_step = training_args.per_device_train_batch_size * training_args.gradient_accumulation_steps
if torch.cuda.device_count() != 0:
Expand Down
Loading

0 comments on commit 0be1512

Please sign in to comment.