-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert averager to libp2p backend #323
Conversation
8590bd9
to
8109d93
Compare
8109d93
to
a8fcb0a
Compare
83606c1
to
83c5d30
Compare
56cb777
to
2ae476f
Compare
cb09676
to
955c058
Compare
6c41027
to
f615693
Compare
We have found that #317 is the reason of periodic test freezes in this branch and 3/30 test runs freeze for the current MPFuture implementation: report. 0/30 test runs freeze for the reverted MPFuture implementation (to the version with torch shared memory): report Now, we are thinking about ways to fix that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this monumental contribution! Before we merge, though, I'd also like to see two things:
- Passing tests (maybe Resolve deadlock in MPFuture #337 and Reduce complexity of several DHT tests #334 will be useful in that regard)
- Performance benchmarks comparing this one with the master branch
@@ -354,7 +354,7 @@ def report_training_progress(self): | |||
with self.lock_local_progress: | |||
current_time = get_dht_time() | |||
local_state_info = TrainingState( | |||
endpoint=self.averager.endpoint, | |||
peer_id=self.averager.endpoint.to_base58(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm slightly against casting to base58 in multiple places all over the code; would appreciate if it was possible to come up with a way to reduce this casting :)
Or maybe you can just call __str__
everywhere, since to_base58
is an implementation detail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use str(endpoint)
, however we would still need to call PeerID.from_base58(value)
to deserialize. Therefore, I'd suggest to keep these operations symmetric.
Also, it is actually more natural to use bytes
for representing PeerID
s in protobufs (and change endpoint.to_base58()
/PeerID.from_base58(value)
to endpoint.to_bytes()
/PeerID(value)
). However, DHT already uses str
for PeerIDs
in protobufs, so I'd like to make the code consistent in this PR (but I don't mind changing everything to bytes
in a separate PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let's change it in a follow-up
hivemind/p2p/servicer.py
Outdated
if len(spec.args) < 3: | ||
raise ValueError( | ||
f"{method_name} is expected to at least three positional arguments " | ||
f"(self: TServicer, request: TInputProtobuf, context: hivemind.p2p.P2PContext)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thankfully, TServicer
and TInputProtobuf
are no more :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed mentioning of TServicer
from this comment. However, I'd still suggest to use the T
prefix for TypeVar
s to distinguish them from the usual types, so I am keeping TInputProtobuf
for now :)
This reverts commit b1a43a5.
@@ -59,6 +59,16 @@ async def aenumerate(aiterable: AsyncIterable[T]) -> AsyncIterable[Tuple[int, T] | |||
index += 1 | |||
|
|||
|
|||
async def asingle(aiter: AsyncIterable[T]) -> T: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is inspired by Single() from LINQ (.NET functional programming functions).
…centralizedAverager instances
Benchmark ResultsSetupnum_peers = 16
target_group_size = 16
request_timeout = 1
hid_size = 8192
num_layers = 1
averaging_expiration = 300 Branch
|
This PR follows #323 and does the remaining mass refactors: 1. Rename `Endpoint` to `PeerID` in averager (+ related variable names) 2. Rename the `P2P.id` field to `P2P.peer_id` (because the local peer ID is stored in the `.peer_id` fields in all other classes) 3. Serialize `PeerID`s as `bytes` instead of Base58 string 4. Remove `JoinRequest.peer_id` and `AveragingData.peer_id` fields (they duplicate `context.remote_id`) 5. Remove the `DecentralizedAveraging` gRPC interface (not used anymore)
Current status: Finished.
What I have tested:
TODO:
experiment_prefix
to handler name for averager RPCs to enable using several different averagers simultaneouslyFollow-up PR: rename PeerID -> Endpoint, use(moved to [REFACTOR] updates to DHT internals #276 )bytes
for PeerIDs in protobufs (instead ofstring
)