-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
webrtc: add WebRTC transport #1655
Conversation
@marten-seemann @mxinden I've implemented the multibase multihash noise handshake here. For verification, we use the remote certificate of the underlying DTLS transport. |
@marten-seemann @mxinden this is ready for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of good stuff here. Thank you!
I added a large number of comments, please don't be shocked by that ;)
One thing I don't really understand yet is how the opening / accepting of data channels works, and what it means for them to be detached. Maybe you could explain that to me here, that would make the next round of reviews easier.
2022-08-05 triage conversation: with the latest spec changes, #1663 is a prereq for this. |
@BigLep That PR is approved and awaiting merging. |
2cf950f
to
71f607f
Compare
@marten-seemann or @MarcoPolo do you have capacity to give this another review? If not, I will take a deeper look myself, though I think your review is way more valuable than mine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review, but there’s already a lot to do here.
The biggest problem is that the datachannel doesn’t implement any backpressure: You’re reading every message that’s sent, and appending it to a buffer. If the application is reading less quickly than the sender is sending us messages, all those messages will accumulate in the buffer, eventually leading to an OOM panic.
Instead, you need to keep the messages at the WebRTC / SCTP layer, until we’re ready to process them (i.e. until Read
is called). Only then can we dequeue the message, so that backpressure can build up.
p2p/transport/webrtc/util.go
Outdated
if err != nil { | ||
return "", err | ||
} | ||
return multibase.Encode(multibase.Base58BTC, encoded) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for using Base 58 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is url friendly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the spec defines an encoding that we should use, doesn’t it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ckousik what is to be done about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved. We now use return multibase.Encode(multibase.Base64url, encoded)
p2p/transport/webrtc/datachannel.go
Outdated
atomic.StoreUint32(&d.remoteReadClosed, 1) | ||
case pb.Message_RESET: | ||
log.Errorf("remote reset") | ||
d.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think that’s correct. Just because the remote reset the stream, we can still write on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ckousik Why did you resolve this comment?? You haven’t addressed it at all!
Either my point is invalid, then please comment here. Or my point is valid, then it needs to be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ckousik what is to be done about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code has completely changed since, so a new review in future will need to indicate if this is fixed / fine or not. For now can be ignored I am afraid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment can be resolved as such
p2p/transport/webrtc/deadline.go
Outdated
"time" | ||
) | ||
|
||
type deadline struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems overly complicated, and wasteful. We should only start a timer when there’s actually a Read
/ Write
call running (and stop the timer as soon as that call returns), not just because the deadline was set.
It also seems like we’re not using the deadline correctly (see comments in datachannel.go
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows updating the deadline while the Read/Write call is running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
… as does the other solution, which probably won’t add that many LOC and long-running timers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ckousik what is to be done about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code has completely changed since, so if something similar is still an issue it will have to be rechecked in a new review I am afraid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As such, this comment can be resolved
bedf270
to
1e0375f
Compare
36f6ae4
to
83b6e3d
Compare
if addr.IP.To4() == nil { | ||
ipVersion = "IP6" | ||
} | ||
fp := fingerprintToSDP(fingerprint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fp is empty string if fingerprint is nil, is that ever a valid state?
If not we probably want to return an error here?!?!!
As otherwise you can get subtle hard to debug issues later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case multihash.SHA2_512: | ||
return crypto.SHA512, true | ||
default: | ||
return crypto.Hash(0), false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return crypto.Hash(0), false | |
return 0, false |
type gets elided here implicitly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remaining := len(d.readBuf) | ||
d.m.Unlock() | ||
|
||
if state := d.getState(); remaining == 0 && (state == stateReadClosed || state == stateClosed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if state := d.getState(); remaining == 0 && (state == stateReadClosed || state == stateClosed) { | |
if remaining == 0 && !d.getState().allowRead() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I seem to have missed this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return string(buf[:n]) | ||
} | ||
|
||
func replaceAll(s string, b byte) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to name this removeAll, or so,
as it doesn't replace it but rather removes them from the string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, fmt.Errorf("could not get local peer ID: %w", err) | ||
} | ||
// We use elliptic P-256 since it is widely supported by browsers. | ||
// See: https://github.com/libp2p/specs/pull/412#discussion_r968294244 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the link above does not really tell anything to an outsider? Perhaps link to an actual spec paragraph?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
listenerMultiaddr = listenerMultiaddr.Encapsulate(certMultiaddress) | ||
|
||
return newListener( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: socket is not closed in case newListener returns an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// The only requirement here is that the ufrag and password | ||
// must be equal, which will allow the server to determine | ||
// the password using the STUN message. | ||
ufrag := "libp2p+webrtc+v1/" + genUfrag(32) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: given we always use this prefix, we might as well pre-allocate this as part of the 32 byte string,
and already use it in the front
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
settingEngine := webrtc.SettingEngine{} | ||
// suppress pion logs | ||
loggerFactory := pionlogger.NewDefaultLoggerFactory() | ||
loggerFactory.DefaultLogLevel = pionlogger.LogLevelDisabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: could it not be useful to have these logs in verbose mode (so opt-in?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
remoteMultihash, err := decodeRemoteFingerprint(remoteMultiaddr) | ||
if err != nil { | ||
return pc, nil, fmt.Errorf("could not decode fingerprint: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: more of a remark, but it's not very useful to say stuff like "could not" in an error,
as an error is always some kind of failure. So you might as well shorten it to: "instantiate peerconnection: %w"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// set the local address from the candidate pair | ||
cp, err := rawHandshakeChannel.Transport().Transport().ICETransport().GetSelectedCandidatePair() | ||
if cp == nil || err != nil { | ||
return pc, nil, fmt.Errorf("ice connection did not have selected candidate pair") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might want to split up these cases, as now this error gets surpresed, meaning it will also never be logged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err != nil { | ||
return nil, err | ||
} | ||
remoteFp = replaceAll(strings.ToLower(remoteFp), byte(':')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO[Question]: is the :
in the string really needed in the first place, seems a human-readable thing
that for machine processing is making things just less efficient / straightforward?!
I get that this is perhaps a spec thing, but still, find it odd...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is needed in the fingerprint spec. From the spec https://www.rfc-editor.org/rfc/rfc4572#section-5
A fingerprint is represented in SDP as an attribute (an 'a' line).
It consists of the name of the hash function used, followed by the
hash value itself. The hash value is represented as a sequence of
uppercase hexadecimal bytes, separated by colons. The number of
bytes is defined by the hash function. (This is the syntax used by
openssl and by the browsers' certificate managers. It is different
from the syntax used to represent hash values in, e.g., HTTP digest
authentication [18], which uses unseparated lowercase hexadecimal
bytes. It was felt that consistency with other applications of
fingerprints was more important.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close(start) | ||
wg.Wait() | ||
require.Equal(t, count, atomic.LoadUint32(&success)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+func TestWebsocketTransport(t *testing.T) {
+ ta, _ := getTransport(t)
+ tb, _ := getTransport(t)
+ ttransport.SubtestTransport(t, ta, tb, fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp), "peerA")
+}
with import ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite"
is missing.
Seems like a standard transport test. Granted not all seem to implement it, but might be good to support this as to adhere to their generic transport logic?! Or what is the desire around this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
socket net.PacketConn | ||
unknownUfragCallback func(string, net.Addr) | ||
|
||
m sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: UDP mux is complex enough that you might want to isolate this thread-unsafe code into a separate data structure,
so you can use it as a blackbox thread-safe data structure, this ensures the mutexes will be used safely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// UDP addresses of the same IP address family (eg. Server-reflexive addresses | ||
// and peer-reflexive addresses). | ||
func (mux *udpMux) GetConn(ufrag string, addr net.Addr) (net.PacketConn, error) { | ||
a, ok := addr.(*net.UDPAddr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it really ever valid for this addr not to be an UDP address??! as for now that error is silently ignored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_ = conn.closeConnection() | ||
delete(mux.ufragMap, key) | ||
for _, addr := range conn.addresses { | ||
// log.Errorf("deleting address : %v %v", ufrag, addr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented out line can be deleted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
func newMuxedConnection(mux *udpMux, ufrag string) *muxedConnection { | ||
ctx, cancel := context.WithCancel(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t is owned by a parent?!?!?!?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we just have a reference to the parent mux because we write to the socket. That can be removed. Also, not sure which t
you are referring to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
// SetDeadline implements net.PacketConn | ||
func (*muxedConnection) SetDeadline(t time.Time) error { | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: document why doing nothing is OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
// SetReadDeadline implements net.PacketConn | ||
func (*muxedConnection) SetReadDeadline(t time.Time) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: document why doing nothing is OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
// SetWriteDeadline implements net.PacketConn | ||
func (*muxedConnection) SetWriteDeadline(t time.Time) error { | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: document why doing nothing is OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (conn *muxedConnection) closeConnection() error { | ||
select { | ||
case <-conn.ctx.Done(): | ||
return fmt.Errorf("already closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: turn into a private static error (var alreadyClosedErr = errors.New("already closed"))
such that you can replace the line below with "return alreadyClosedErr"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
var ( | ||
errTooManyPackets = fmt.Errorf("too many packets in queue; dropping") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: use errors.New instead of fmt.Errorf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// pop reads a packet from the packetQueue or blocks until | ||
// either a packet becomes available or the buffer is closed. | ||
func (pq *packetQueue) pop(ctx context.Context, buf []byte) (int, net.Addr, error) { | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: cannot put my finger on it yet, but this code smells,
it seems like you are mixing logic boundaries here,
making it harder to reason about the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason this does not return when the internal context's Done()
returns is because there could still be packets in the channel that could be read even if the channel is closed. It should ideally be a priority select with priority for reading from the channel, followed by checking if the context is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
// push adds a packet to the packetQueue | ||
func (pq *packetQueue) push(buf []byte, addr net.Addr) error { | ||
// we acquire a lock when sending on the channel to prevent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: is this really needed?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've seen it panic without this. We cannot guarantee sending on the channel and closure occurs in the same goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return "" | ||
} | ||
fpDigest := intersperse2(hex.EncodeToString(fp.Digest), ':', 2) | ||
return getSupportedSDPString(fp.Code) + " " + fpDigest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: below can be a subtle bug as getSupportedSDPString
can return
an empty string due to an error, but as we now append it no longer is an empty string,
thus probably lead do an obscure error much later?
Not better to just return an error to begin with?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" | ||
|
||
func genUfrag(n int) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: not sure you need to make it as generic as you do here.
might as well hardcode 32 here and reuse the objects in a pool?
You would need to benchmark to be sure though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can hardcode it to 32, but given that I return a string to make this usage convenient, I would somehow have to track the lifetime of the string to ensure it is returned to the pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR can be closed in favour of #1999. |
Closing in favor of #1999 |
This PR implements the webrtc transport spec according to libp2p/specs#412 .
The webrtc protocol for multiaddr is implemented in this PR and needs to be implemented in
go-multiaddr
prior to merging this PR. This PR also uses multibase encoded multihash for DTLS fingerprint verification after the NOISE handshake.