-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle edge cases in DecentralizedAverager #171
Conversation
@@ -182,7 +187,11 @@ async def run(self) -> Sequence[torch.Tensor]: | |||
async def accumulate_part_streaming(self, source: Endpoint, stream_messages: Iterable[runtime_pb2.Tensor] | |||
) -> Iterable[runtime_pb2.Tensor]: | |||
""" accumulate_part using streams of serialized tensors. Used to prevent duplicate work in serialization """ | |||
tensor_part: torch.Tensor = deserialize_torch_tensor(combine_from_streaming(stream_messages)) | |||
try: | |||
tensor_part: torch.Tensor = deserialize_torch_tensor(combine_from_streaming(stream_messages)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant type annotation
tests/test_util_modules.py
Outdated
tensor = torch.randn(910, 512) | ||
serialized_tensor_part = hivemind.utils.serialize_torch_tensor(tensor, allow_inplace=False) | ||
chunks1 = list(hivemind.utils.split_for_streaming(serialized_tensor_part, 16384)) | ||
assert len(chunks1) == 114 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better explicitly compute these magic numbers
Also: L203, L211
except RuntimeError as e: | ||
raise AllreduceException(f"Could not deserialize tensor part from {source} for streaming {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we should narrow down the scope of this exception:
- Do we want to catch errors that happen only because of streaming? If so, enclose
combine_from_streaming
inside the try-except block - Do we want to catch any kind of specifically deserialization errors? If so, move this check inside
deserialize_torch_tensor
. If you still want an AllreduceException here, you can then catch a specific SerializerException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that errors in combine_from_streaming only manifest in deserialize_torch_tensor .
tests/test_util_modules.py
Outdated
tensor = torch.randn(910, 512) | ||
serialized_tensor_part = hivemind.utils.serialize_torch_tensor(tensor, allow_inplace=False) | ||
chunks1 = list(hivemind.utils.split_for_streaming(serialized_tensor_part, 16384)) | ||
assert len(chunks1) == int(np.ceil(tensor.numel() * 4 / 16384)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert len(chunks1) == int(np.ceil(tensor.numel() * 4 / 16384)) | |
assert len(chunks1) == int(np.ceil(tensor.numel() * tensor.element_size() / 16384)) |
tests/test_util_modules.py
Outdated
combined_incomplete3 = hivemind.utils.combine_from_streaming(chunks4[:-1]) | ||
for combined in combined_incomplete, combined_incomplete2, combined_incomplete3: | ||
with pytest.raises(RuntimeError): | ||
_ = hivemind.deserialize_torch_tensor(combined) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is assignment necessary?
Co-authored-by: Max Ryabinin <[email protected]>
* copytree implementation for py37 compatibility (#162) * copytree implementation for py37 compatibility * Running tests for python3.7 * Increment version * Python3.7 notions * Remove pickle.loads in averager (#160) * Security update: remove pickle.loads in averager * add py37 to circleci config Co-authored-by: Alexander Borzunov <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> * Support edge cases for DHT key/subkey/value, add tests, update .gitignore for pb2 (#167) * fix bug with subkey equals zero * add autogenerated protobuf files to .gitignore * test store and get "tricky" values in dht * Fix the remaining tests for py37 (#166) * DecentralizedAverager is now compatible with python37's acyncio exception * the problem was: grpc.aio with python37 raised concurrent.futures.CancelledError in some cases; * we relied on isinstance(asyncio.CancelledError, Exception) == False * but isinstance(concurrent.futures.CancelledError, Exception) == True * DecentralizedAverager now shuts down if dereferenced in the main process * though it won't shutdown if dereferenced in forks for obvious reasons * HIVEMIND_THREADS now actually works * test_averaging now shuts down dht and averager instances to avoid leaking processes Co-authored-by: Max Ryabinin <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> * Move Averager metadata serialization out of user scope (#168) * move metadata serialization outside user scope * test_overcrowded: reduce the default number of peers * Handle edge cases in DecentralizedAverager (#171) * move metadata serialization outside user scope * retry averager.step on network errors * raise AllreduceException on partial tensor * test split/combine tensors, combine corrupted stream Co-authored-by: Max Ryabinin <[email protected]> * Fix a typo in quickstart.md (#174) * Serialize DHTID source with msgpack (#172) * Change DHTID serializer * Remove unused serializers * Add msgpack tuple serialization * Move CLI server launch script to hivemind/hivemind_cli (#173) * Cast environment variables to correct types * Compiling libp2p daemon on setup (#153) * add setup.py prototype * refactor * feat: add p2p daemon (#164) * Add p2p daemon * Test p2p daemon exits correctly * Impose restriction on elapsed time Co-authored-by: Ilya Kobelev <[email protected]> * compare golang versions using packaging.version * fix typo Co-authored-by: justheuristic <[email protected]> * move p2pd executable to hivemind/hivemind_cli Co-authored-by: Alexey Bukhtiyarov <[email protected]> Co-authored-by: justheuristic <[email protected]> Co-authored-by: Alexander Borzunov <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> Co-authored-by: Michael Diskin <[email protected]> Co-authored-by: romakail <[email protected]> Co-authored-by: Ilya <[email protected]> Co-authored-by: Ilya Kobelev <[email protected]>
* copytree implementation for py37 compatibility (#162) * copytree implementation for py37 compatibility * Running tests for python3.7 * Increment version * Python3.7 notions * Remove pickle.loads in averager (#160) * Security update: remove pickle.loads in averager * add py37 to circleci config Co-authored-by: Alexander Borzunov <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> * Support edge cases for DHT key/subkey/value, add tests, update .gitignore for pb2 (#167) * fix bug with subkey equals zero * add autogenerated protobuf files to .gitignore * test store and get "tricky" values in dht * Fix the remaining tests for py37 (#166) * DecentralizedAverager is now compatible with python37's acyncio exception * the problem was: grpc.aio with python37 raised concurrent.futures.CancelledError in some cases; * we relied on isinstance(asyncio.CancelledError, Exception) == False * but isinstance(concurrent.futures.CancelledError, Exception) == True * DecentralizedAverager now shuts down if dereferenced in the main process * though it won't shutdown if dereferenced in forks for obvious reasons * HIVEMIND_THREADS now actually works * test_averaging now shuts down dht and averager instances to avoid leaking processes Co-authored-by: Max Ryabinin <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> * Move Averager metadata serialization out of user scope (#168) * move metadata serialization outside user scope * test_overcrowded: reduce the default number of peers * Handle edge cases in DecentralizedAverager (#171) * move metadata serialization outside user scope * retry averager.step on network errors * raise AllreduceException on partial tensor * test split/combine tensors, combine corrupted stream Co-authored-by: Max Ryabinin <[email protected]> * Fix a typo in quickstart.md (#174) * Serialize DHTID source with msgpack (#172) * Change DHTID serializer * Remove unused serializers * Add msgpack tuple serialization * Move CLI server launch script to hivemind/hivemind_cli (#173) * Cast environment variables to correct types * Compiling libp2p daemon on setup (#153) * add setup.py prototype * refactor * feat: add p2p daemon (#164) * Add p2p daemon * Test p2p daemon exits correctly * Impose restriction on elapsed time Co-authored-by: Ilya Kobelev <[email protected]> * compare golang versions using packaging.version * fix typo Co-authored-by: justheuristic <[email protected]> * move p2pd executable to hivemind/hivemind_cli Co-authored-by: Alexey Bukhtiyarov <[email protected]> Co-authored-by: justheuristic <[email protected]> Co-authored-by: Alexander Borzunov <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> Co-authored-by: Michael Diskin <[email protected]> Co-authored-by: romakail <[email protected]> Co-authored-by: Ilya <[email protected]> Co-authored-by: Ilya Kobelev <[email protected]>
* copytree implementation for py37 compatibility (#162) * copytree implementation for py37 compatibility * Running tests for python3.7 * Increment version * Python3.7 notions * Remove pickle.loads in averager (#160) * Security update: remove pickle.loads in averager * add py37 to circleci config Co-authored-by: Alexander Borzunov <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> * Support edge cases for DHT key/subkey/value, add tests, update .gitignore for pb2 (#167) * fix bug with subkey equals zero * add autogenerated protobuf files to .gitignore * test store and get "tricky" values in dht * Fix the remaining tests for py37 (#166) * DecentralizedAverager is now compatible with python37's acyncio exception * the problem was: grpc.aio with python37 raised concurrent.futures.CancelledError in some cases; * we relied on isinstance(asyncio.CancelledError, Exception) == False * but isinstance(concurrent.futures.CancelledError, Exception) == True * DecentralizedAverager now shuts down if dereferenced in the main process * though it won't shutdown if dereferenced in forks for obvious reasons * HIVEMIND_THREADS now actually works * test_averaging now shuts down dht and averager instances to avoid leaking processes Co-authored-by: Max Ryabinin <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> * Move Averager metadata serialization out of user scope (#168) * move metadata serialization outside user scope * test_overcrowded: reduce the default number of peers * Handle edge cases in DecentralizedAverager (#171) * move metadata serialization outside user scope * retry averager.step on network errors * raise AllreduceException on partial tensor * test split/combine tensors, combine corrupted stream Co-authored-by: Max Ryabinin <[email protected]> * Fix a typo in quickstart.md (#174) * Serialize DHTID source with msgpack (#172) * Change DHTID serializer * Remove unused serializers * Add msgpack tuple serialization * Move CLI server launch script to hivemind/hivemind_cli (#173) * Cast environment variables to correct types * Compiling libp2p daemon on setup (#153) * add setup.py prototype * refactor * feat: add p2p daemon (#164) * Add p2p daemon * Test p2p daemon exits correctly * Impose restriction on elapsed time Co-authored-by: Ilya Kobelev <[email protected]> * compare golang versions using packaging.version * fix typo Co-authored-by: justheuristic <[email protected]> * move p2pd executable to hivemind/hivemind_cli Co-authored-by: Alexey Bukhtiyarov <[email protected]> Co-authored-by: justheuristic <[email protected]> Co-authored-by: Alexander Borzunov <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> Co-authored-by: Michael Diskin <[email protected]> Co-authored-by: romakail <[email protected]> Co-authored-by: Ilya <[email protected]> Co-authored-by: Ilya Kobelev <[email protected]>
* copytree implementation for py37 compatibility (#162) * copytree implementation for py37 compatibility * Running tests for python3.7 * Increment version * Python3.7 notions * Remove pickle.loads in averager (#160) * Security update: remove pickle.loads in averager * add py37 to circleci config Co-authored-by: Alexander Borzunov <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> * Support edge cases for DHT key/subkey/value, add tests, update .gitignore for pb2 (#167) * fix bug with subkey equals zero * add autogenerated protobuf files to .gitignore * test store and get "tricky" values in dht * Fix the remaining tests for py37 (#166) * DecentralizedAverager is now compatible with python37's acyncio exception * the problem was: grpc.aio with python37 raised concurrent.futures.CancelledError in some cases; * we relied on isinstance(asyncio.CancelledError, Exception) == False * but isinstance(concurrent.futures.CancelledError, Exception) == True * DecentralizedAverager now shuts down if dereferenced in the main process * though it won't shutdown if dereferenced in forks for obvious reasons * HIVEMIND_THREADS now actually works * test_averaging now shuts down dht and averager instances to avoid leaking processes Co-authored-by: Max Ryabinin <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> * Move Averager metadata serialization out of user scope (#168) * move metadata serialization outside user scope * test_overcrowded: reduce the default number of peers * Handle edge cases in DecentralizedAverager (#171) * move metadata serialization outside user scope * retry averager.step on network errors * raise AllreduceException on partial tensor * test split/combine tensors, combine corrupted stream Co-authored-by: Max Ryabinin <[email protected]> * Fix a typo in quickstart.md (#174) * Serialize DHTID source with msgpack (#172) * Change DHTID serializer * Remove unused serializers * Add msgpack tuple serialization * Move CLI server launch script to hivemind/hivemind_cli (#173) * Cast environment variables to correct types * Compiling libp2p daemon on setup (#153) * add setup.py prototype * refactor * feat: add p2p daemon (#164) * Add p2p daemon * Test p2p daemon exits correctly * Impose restriction on elapsed time Co-authored-by: Ilya Kobelev <[email protected]> * compare golang versions using packaging.version * fix typo Co-authored-by: justheuristic <[email protected]> * move p2pd executable to hivemind/hivemind_cli Co-authored-by: Alexey Bukhtiyarov <[email protected]> Co-authored-by: justheuristic <[email protected]> Co-authored-by: Alexander Borzunov <[email protected]> Co-authored-by: Max Ryabinin <[email protected]> Co-authored-by: Michael Diskin <[email protected]> Co-authored-by: romakail <[email protected]> Co-authored-by: Ilya <[email protected]> Co-authored-by: Ilya Kobelev <[email protected]>
Method: we ran ALBERT training with 5 peers and shut down a random process during averager.step
Expected behavior: surviving averagers should always retry if error was caused by another peer
Found and fixed two cases that did not trigger retries in averager.step():
Also:
Note: the screenshot above contains improperly formatted logs (see {e}), this is now fixed.