diff --git a/limits.go b/limits.go index 41b3da8dcb..5871577e51 100644 --- a/limits.go +++ b/limits.go @@ -4,7 +4,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/host/autonat" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" circuit "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" @@ -76,18 +75,6 @@ func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) { rcmgr.BaseLimitIncrease{}, ) - // relay/v1 - config.AddServiceLimit( - relayv1.ServiceName, - rcmgr.BaseLimit{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20}, - rcmgr.BaseLimitIncrease{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20}, - ) - config.AddServicePeerLimit( - relayv1.ServiceName, - rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 1 << 20}, - rcmgr.BaseLimitIncrease{}, - ) - // relay/v2 config.AddServiceLimit( relayv2.ServiceName, @@ -101,7 +88,7 @@ func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) { ) // circuit protocols, both client and service - for _, proto := range [...]protocol.ID{circuit.ProtoIDv1, circuit.ProtoIDv2Hop, circuit.ProtoIDv2Stop} { + for _, proto := range [...]protocol.ID{circuit.ProtoIDv2Hop, circuit.ProtoIDv2Stop} { config.AddProtocolLimit( proto, rcmgr.BaseLimit{StreamsInbound: 640, StreamsOutbound: 640, Streams: 640, Memory: 16 << 20}, diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 53d7d67e9a..2a03fd5dc1 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -14,7 +14,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/host/autorelay" - relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" "github.com/benbjohnson/clock" @@ -102,29 +101,6 @@ func newRelay(t *testing.T) host.Host { return h } -func newRelayV1(t *testing.T) host.Host { - t.Helper() - h, err := libp2p.New( - libp2p.DisableRelay(), - libp2p.ForceReachabilityPublic(), - libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - for i, addr := range addrs { - saddr := addr.String() - if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") { - addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1") - addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP) - } - } - return addrs - }), - ) - require.NoError(t, err) - r, err := relayv1.NewRelay(h) - require.NoError(t, err) - t.Cleanup(func() { r.Close() }) - return h -} - func TestSingleCandidate(t *testing.T) { var counter int h := newPrivateNode(t, @@ -180,32 +156,6 @@ func TestSingleRelay(t *testing.T) { // test that we don't add any more relays require.Never(t, func() bool { return numRelays(h) > 1 }, 200*time.Millisecond, 50*time.Millisecond) } -func TestPreferRelayV2(t *testing.T) { - r := newRelay(t) - defer r.Close() - // The relay supports both v1 and v2. The v1 stream handler should never be called, - // if we prefer v2 relays. - r.SetStreamHandler(relayv1.ProtoID, func(str network.Stream) { - str.Reset() - t.Fatal("used relay v1") - }) - - h := newPrivateNode(t, - func(context.Context, int) <-chan peer.AddrInfo { - peerChan := make(chan peer.AddrInfo, 1) - defer close(peerChan) - peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} - return peerChan - }, - autorelay.WithMaxCandidates(1), - autorelay.WithNumRelays(99999), - autorelay.WithBootDelay(0), - autorelay.WithMinInterval(time.Hour), - ) - defer h.Close() - - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) -} func TestWaitForCandidates(t *testing.T) { peerChan := make(chan peer.AddrInfo) @@ -305,46 +255,6 @@ func TestStaticRelays(t *testing.T) { require.Eventually(t, func() bool { return numRelays(h) > 0 }, 2*time.Second, 50*time.Millisecond) } -func TestRelayV1(t *testing.T) { - t.Run("relay v1 support disabled", func(t *testing.T) { - peerChan := make(chan peer.AddrInfo, 1) - r := newRelayV1(t) - t.Cleanup(func() { r.Close() }) - peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} - close(peerChan) - - h := newPrivateNode(t, - func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, - autorelay.WithBootDelay(0), - autorelay.WithMinInterval(time.Hour), - ) - defer h.Close() - - require.Never(t, func() bool { return numRelays(h) > 0 }, 250*time.Millisecond, 100*time.Millisecond) - }) - - t.Run("relay v1 support enabled", func(t *testing.T) { - peerChan := make(chan peer.AddrInfo, 1) - r := newRelayV1(t) - t.Cleanup(func() { r.Close() }) - peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} - close(peerChan) - - h := newPrivateNode(t, - func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, - autorelay.WithBootDelay(0), - autorelay.WithCircuitV1Support(), - autorelay.WithMinInterval(time.Hour), - ) - defer h.Close() - - addrUpdated, err := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)) - require.NoError(t, err) - - expectDeltaInAddrUpdated(t, addrUpdated, 1) - }) -} - func TestConnectOnDisconnect(t *testing.T) { const num = 3 peerChan := make(chan peer.AddrInfo, num) diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go index f8d1414c52..1990e99577 100644 --- a/p2p/host/autorelay/options.go +++ b/p2p/host/autorelay/options.go @@ -42,7 +42,6 @@ type config struct { // see WithMaxCandidateAge maxCandidateAge time.Duration setMinCandidates bool - enableCircuitV1 bool } var defaultConfig = config{ @@ -151,14 +150,6 @@ func WithBackoff(d time.Duration) Option { } } -// WithCircuitV1Support enables support for circuit v1 relays. -func WithCircuitV1Support() Option { - return func(c *config) error { - c.enableCircuitV1 = true - return nil - } -} - // WithMaxCandidateAge sets the maximum age of a candidate. // When we are connected to the desired number of relays, we don't ask the peer source for new candidates. // This can lead to AutoRelay's candidate list becoming outdated, and means we won't be able diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 098b19c2f3..5c6e903404 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -15,7 +15,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" basic "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/host/eventbus" - relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" @@ -23,13 +22,10 @@ import ( manet "github.com/multiformats/go-multiaddr/net" ) -const ( - protoIDv1 = relayv1.ProtoID - protoIDv2 = circuitv2_proto.ProtoIDv2Hop -) +const protoIDv2 = circuitv2_proto.ProtoIDv2Hop // Terminology: -// Candidate: Once we connect to a node and it supports (v1 / v2) relay protocol, +// Candidate: Once we connect to a node and it supports relay protocol, // we call it a candidate, and consider using it as a relay. // Relay: Out of the list of candidates, we select a relay to connect to. // Currently, we just randomly select a candidate, but we can employ more sophisticated @@ -77,7 +73,7 @@ type relayFinder struct { relayUpdated chan struct{} relayMx sync.Mutex - relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay + relays map[peer.ID]*circuitv2.Reservation cachedAddrs []ma.Multiaddr cachedAddrsExpiry time.Time @@ -288,7 +284,7 @@ func (rf *relayFinder) notifyNewCandidate() { } } -// handleNewNode tests if a peer supports circuit v1 or v2. +// handleNewNode tests if a peer supports circuit v2. // This method is only run on private nodes. // If a peer does, it is added to the candidates map. // Note that just supporting the protocol doesn't guarantee that we can also obtain a reservation. @@ -322,7 +318,7 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) (add return true } -// tryNode checks if a peer actually supports either circuit v1 or circuit v2. +// tryNode checks if a peer actually supports either circuit v2. // It does not modify any internal state. func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV2 bool, err error) { if err := rf.host.Connect(ctx, pi); err != nil { @@ -357,42 +353,14 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR return false, ctx.Err() } - protos, err := rf.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2) + protos, err := rf.host.Peerstore().SupportsProtocols(pi.ID, protoIDv2) if err != nil { return false, fmt.Errorf("error checking relay protocol support for peer %s: %w", pi.ID, err) } - - // If the node speaks both, prefer circuit v2 - var maybeSupportsV1, supportsV2 bool - for _, proto := range protos { - switch proto { - case protoIDv1: - maybeSupportsV1 = true - case protoIDv2: - supportsV2 = true - } - } - - if supportsV2 { - return true, nil - } - - if !rf.conf.enableCircuitV1 && !supportsV2 { + if len(protos) == 0 { return false, errors.New("doesn't speak circuit v2") } - if !maybeSupportsV1 && !supportsV2 { - return false, errors.New("doesn't speak circuit v1 or v2") - } - - // The node *may* support circuit v1. - supportsV1, err := relayv1.CanHop(ctx, rf.host, pi.ID) - if err != nil { - return false, fmt.Errorf("CanHop failed: %w", err) - } - if !supportsV1 { - return false, errors.New("doesn't speak circuit v1 or v2") - } - return false, nil + return true, nil } // When a new node that could be a relay is found, we receive a notification on the maybeConnectToRelayTrigger chan. @@ -520,9 +488,6 @@ func (rf *relayFinder) refreshReservations(ctx context.Context, now time.Time) b // find reservations about to expire and refresh them in parallel g := new(errgroup.Group) for p, rsvp := range rf.relays { - if rsvp == nil { // this is a circuit v1 relay, there is no reservation - continue - } if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) { continue } diff --git a/p2p/protocol/circuitv1/pb/circuitv1.pb.go b/p2p/protocol/circuitv1/pb/circuitv1.pb.go deleted file mode 100644 index 1fe5eaa91b..0000000000 --- a/p2p/protocol/circuitv1/pb/circuitv1.pb.go +++ /dev/null @@ -1,448 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.12 -// source: pb/circuitv1.proto - -package pb - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type CircuitRelay_Status int32 - -const ( - CircuitRelay_SUCCESS CircuitRelay_Status = 100 - CircuitRelay_HOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 220 - CircuitRelay_HOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 221 - CircuitRelay_HOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 250 - CircuitRelay_HOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 251 - CircuitRelay_HOP_NO_CONN_TO_DST CircuitRelay_Status = 260 - CircuitRelay_HOP_CANT_DIAL_DST CircuitRelay_Status = 261 - CircuitRelay_HOP_CANT_OPEN_DST_STREAM CircuitRelay_Status = 262 - CircuitRelay_HOP_CANT_SPEAK_RELAY CircuitRelay_Status = 270 - CircuitRelay_HOP_CANT_RELAY_TO_SELF CircuitRelay_Status = 280 - CircuitRelay_STOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 320 - CircuitRelay_STOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 321 - CircuitRelay_STOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 350 - CircuitRelay_STOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 351 - CircuitRelay_STOP_RELAY_REFUSED CircuitRelay_Status = 390 - CircuitRelay_MALFORMED_MESSAGE CircuitRelay_Status = 400 -) - -// Enum value maps for CircuitRelay_Status. -var ( - CircuitRelay_Status_name = map[int32]string{ - 100: "SUCCESS", - 220: "HOP_SRC_ADDR_TOO_LONG", - 221: "HOP_DST_ADDR_TOO_LONG", - 250: "HOP_SRC_MULTIADDR_INVALID", - 251: "HOP_DST_MULTIADDR_INVALID", - 260: "HOP_NO_CONN_TO_DST", - 261: "HOP_CANT_DIAL_DST", - 262: "HOP_CANT_OPEN_DST_STREAM", - 270: "HOP_CANT_SPEAK_RELAY", - 280: "HOP_CANT_RELAY_TO_SELF", - 320: "STOP_SRC_ADDR_TOO_LONG", - 321: "STOP_DST_ADDR_TOO_LONG", - 350: "STOP_SRC_MULTIADDR_INVALID", - 351: "STOP_DST_MULTIADDR_INVALID", - 390: "STOP_RELAY_REFUSED", - 400: "MALFORMED_MESSAGE", - } - CircuitRelay_Status_value = map[string]int32{ - "SUCCESS": 100, - "HOP_SRC_ADDR_TOO_LONG": 220, - "HOP_DST_ADDR_TOO_LONG": 221, - "HOP_SRC_MULTIADDR_INVALID": 250, - "HOP_DST_MULTIADDR_INVALID": 251, - "HOP_NO_CONN_TO_DST": 260, - "HOP_CANT_DIAL_DST": 261, - "HOP_CANT_OPEN_DST_STREAM": 262, - "HOP_CANT_SPEAK_RELAY": 270, - "HOP_CANT_RELAY_TO_SELF": 280, - "STOP_SRC_ADDR_TOO_LONG": 320, - "STOP_DST_ADDR_TOO_LONG": 321, - "STOP_SRC_MULTIADDR_INVALID": 350, - "STOP_DST_MULTIADDR_INVALID": 351, - "STOP_RELAY_REFUSED": 390, - "MALFORMED_MESSAGE": 400, - } -) - -func (x CircuitRelay_Status) Enum() *CircuitRelay_Status { - p := new(CircuitRelay_Status) - *p = x - return p -} - -func (x CircuitRelay_Status) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (CircuitRelay_Status) Descriptor() protoreflect.EnumDescriptor { - return file_pb_circuitv1_proto_enumTypes[0].Descriptor() -} - -func (CircuitRelay_Status) Type() protoreflect.EnumType { - return &file_pb_circuitv1_proto_enumTypes[0] -} - -func (x CircuitRelay_Status) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Do not use. -func (x *CircuitRelay_Status) UnmarshalJSON(b []byte) error { - num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b) - if err != nil { - return err - } - *x = CircuitRelay_Status(num) - return nil -} - -// Deprecated: Use CircuitRelay_Status.Descriptor instead. -func (CircuitRelay_Status) EnumDescriptor() ([]byte, []int) { - return file_pb_circuitv1_proto_rawDescGZIP(), []int{0, 0} -} - -type CircuitRelay_Type int32 - -const ( - CircuitRelay_HOP CircuitRelay_Type = 1 - CircuitRelay_STOP CircuitRelay_Type = 2 - CircuitRelay_STATUS CircuitRelay_Type = 3 - CircuitRelay_CAN_HOP CircuitRelay_Type = 4 -) - -// Enum value maps for CircuitRelay_Type. -var ( - CircuitRelay_Type_name = map[int32]string{ - 1: "HOP", - 2: "STOP", - 3: "STATUS", - 4: "CAN_HOP", - } - CircuitRelay_Type_value = map[string]int32{ - "HOP": 1, - "STOP": 2, - "STATUS": 3, - "CAN_HOP": 4, - } -) - -func (x CircuitRelay_Type) Enum() *CircuitRelay_Type { - p := new(CircuitRelay_Type) - *p = x - return p -} - -func (x CircuitRelay_Type) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (CircuitRelay_Type) Descriptor() protoreflect.EnumDescriptor { - return file_pb_circuitv1_proto_enumTypes[1].Descriptor() -} - -func (CircuitRelay_Type) Type() protoreflect.EnumType { - return &file_pb_circuitv1_proto_enumTypes[1] -} - -func (x CircuitRelay_Type) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Do not use. -func (x *CircuitRelay_Type) UnmarshalJSON(b []byte) error { - num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b) - if err != nil { - return err - } - *x = CircuitRelay_Type(num) - return nil -} - -// Deprecated: Use CircuitRelay_Type.Descriptor instead. -func (CircuitRelay_Type) EnumDescriptor() ([]byte, []int) { - return file_pb_circuitv1_proto_rawDescGZIP(), []int{0, 1} -} - -type CircuitRelay struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Type *CircuitRelay_Type `protobuf:"varint,1,opt,name=type,enum=circuitv1.pb.CircuitRelay_Type" json:"type,omitempty"` // Type of the message - SrcPeer *CircuitRelay_Peer `protobuf:"bytes,2,opt,name=srcPeer" json:"srcPeer,omitempty"` // srcPeer and dstPeer are used when Type is HOP or STOP - DstPeer *CircuitRelay_Peer `protobuf:"bytes,3,opt,name=dstPeer" json:"dstPeer,omitempty"` - Code *CircuitRelay_Status `protobuf:"varint,4,opt,name=code,enum=circuitv1.pb.CircuitRelay_Status" json:"code,omitempty"` // Status code, used when Type is STATUS -} - -func (x *CircuitRelay) Reset() { - *x = CircuitRelay{} - if protoimpl.UnsafeEnabled { - mi := &file_pb_circuitv1_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *CircuitRelay) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*CircuitRelay) ProtoMessage() {} - -func (x *CircuitRelay) ProtoReflect() protoreflect.Message { - mi := &file_pb_circuitv1_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use CircuitRelay.ProtoReflect.Descriptor instead. -func (*CircuitRelay) Descriptor() ([]byte, []int) { - return file_pb_circuitv1_proto_rawDescGZIP(), []int{0} -} - -func (x *CircuitRelay) GetType() CircuitRelay_Type { - if x != nil && x.Type != nil { - return *x.Type - } - return CircuitRelay_HOP -} - -func (x *CircuitRelay) GetSrcPeer() *CircuitRelay_Peer { - if x != nil { - return x.SrcPeer - } - return nil -} - -func (x *CircuitRelay) GetDstPeer() *CircuitRelay_Peer { - if x != nil { - return x.DstPeer - } - return nil -} - -func (x *CircuitRelay) GetCode() CircuitRelay_Status { - if x != nil && x.Code != nil { - return *x.Code - } - return CircuitRelay_SUCCESS -} - -type CircuitRelay_Peer struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Id []byte `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` // peer id - Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` // peer's known addresses -} - -func (x *CircuitRelay_Peer) Reset() { - *x = CircuitRelay_Peer{} - if protoimpl.UnsafeEnabled { - mi := &file_pb_circuitv1_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *CircuitRelay_Peer) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*CircuitRelay_Peer) ProtoMessage() {} - -func (x *CircuitRelay_Peer) ProtoReflect() protoreflect.Message { - mi := &file_pb_circuitv1_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use CircuitRelay_Peer.ProtoReflect.Descriptor instead. -func (*CircuitRelay_Peer) Descriptor() ([]byte, []int) { - return file_pb_circuitv1_proto_rawDescGZIP(), []int{0, 0} -} - -func (x *CircuitRelay_Peer) GetId() []byte { - if x != nil { - return x.Id - } - return nil -} - -func (x *CircuitRelay_Peer) GetAddrs() [][]byte { - if x != nil { - return x.Addrs - } - return nil -} - -var File_pb_circuitv1_proto protoreflect.FileDescriptor - -var file_pb_circuitv1_proto_rawDesc = []byte{ - 0x0a, 0x12, 0x70, 0x62, 0x2f, 0x63, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x63, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31, 0x2e, - 0x70, 0x62, 0x22, 0x97, 0x06, 0x0a, 0x0c, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, - 0x6c, 0x61, 0x79, 0x12, 0x33, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0e, 0x32, 0x1f, 0x2e, 0x63, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31, 0x2e, 0x70, 0x62, - 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x54, 0x79, - 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x50, - 0x65, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x63, 0x69, 0x72, 0x63, - 0x75, 0x69, 0x74, 0x76, 0x31, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, - 0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50, - 0x65, 0x65, 0x72, 0x12, 0x39, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x63, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31, - 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, - 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x12, 0x35, - 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x21, 0x2e, 0x63, - 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, - 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, - 0x04, 0x63, 0x6f, 0x64, 0x65, 0x1a, 0x2c, 0x0a, 0x04, 0x50, 0x65, 0x65, 0x72, 0x12, 0x0e, 0x0a, - 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x02, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, - 0x05, 0x61, 0x64, 0x64, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x05, 0x61, 0x64, - 0x64, 0x72, 0x73, 0x22, 0xc2, 0x03, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, - 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x64, 0x12, 0x1a, 0x0a, 0x15, 0x48, - 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, - 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xdc, 0x01, 0x12, 0x1a, 0x0a, 0x15, 0x48, 0x4f, 0x50, 0x5f, 0x44, - 0x53, 0x54, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47, - 0x10, 0xdd, 0x01, 0x12, 0x1e, 0x0a, 0x19, 0x48, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x4d, - 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, - 0x10, 0xfa, 0x01, 0x12, 0x1e, 0x0a, 0x19, 0x48, 0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x4d, - 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, - 0x10, 0xfb, 0x01, 0x12, 0x17, 0x0a, 0x12, 0x48, 0x4f, 0x50, 0x5f, 0x4e, 0x4f, 0x5f, 0x43, 0x4f, - 0x4e, 0x4e, 0x5f, 0x54, 0x4f, 0x5f, 0x44, 0x53, 0x54, 0x10, 0x84, 0x02, 0x12, 0x16, 0x0a, 0x11, - 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f, 0x44, 0x49, 0x41, 0x4c, 0x5f, 0x44, 0x53, - 0x54, 0x10, 0x85, 0x02, 0x12, 0x1d, 0x0a, 0x18, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, - 0x5f, 0x4f, 0x50, 0x45, 0x4e, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, - 0x10, 0x86, 0x02, 0x12, 0x19, 0x0a, 0x14, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f, - 0x53, 0x50, 0x45, 0x41, 0x4b, 0x5f, 0x52, 0x45, 0x4c, 0x41, 0x59, 0x10, 0x8e, 0x02, 0x12, 0x1b, - 0x0a, 0x16, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f, 0x52, 0x45, 0x4c, 0x41, 0x59, - 0x5f, 0x54, 0x4f, 0x5f, 0x53, 0x45, 0x4c, 0x46, 0x10, 0x98, 0x02, 0x12, 0x1b, 0x0a, 0x16, 0x53, - 0x54, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, - 0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xc0, 0x02, 0x12, 0x1b, 0x0a, 0x16, 0x53, 0x54, 0x4f, 0x50, - 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, - 0x4e, 0x47, 0x10, 0xc1, 0x02, 0x12, 0x1f, 0x0a, 0x1a, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x53, 0x52, - 0x43, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, - 0x4c, 0x49, 0x44, 0x10, 0xde, 0x02, 0x12, 0x1f, 0x0a, 0x1a, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x44, - 0x53, 0x54, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, - 0x41, 0x4c, 0x49, 0x44, 0x10, 0xdf, 0x02, 0x12, 0x17, 0x0a, 0x12, 0x53, 0x54, 0x4f, 0x50, 0x5f, - 0x52, 0x45, 0x4c, 0x41, 0x59, 0x5f, 0x52, 0x45, 0x46, 0x55, 0x53, 0x45, 0x44, 0x10, 0x86, 0x03, - 0x12, 0x16, 0x0a, 0x11, 0x4d, 0x41, 0x4c, 0x46, 0x4f, 0x52, 0x4d, 0x45, 0x44, 0x5f, 0x4d, 0x45, - 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x90, 0x03, 0x22, 0x32, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x07, 0x0a, 0x03, 0x48, 0x4f, 0x50, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x54, 0x4f, - 0x50, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x10, 0x03, 0x12, - 0x0b, 0x0a, 0x07, 0x43, 0x41, 0x4e, 0x5f, 0x48, 0x4f, 0x50, 0x10, 0x04, -} - -var ( - file_pb_circuitv1_proto_rawDescOnce sync.Once - file_pb_circuitv1_proto_rawDescData = file_pb_circuitv1_proto_rawDesc -) - -func file_pb_circuitv1_proto_rawDescGZIP() []byte { - file_pb_circuitv1_proto_rawDescOnce.Do(func() { - file_pb_circuitv1_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_circuitv1_proto_rawDescData) - }) - return file_pb_circuitv1_proto_rawDescData -} - -var file_pb_circuitv1_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_pb_circuitv1_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_pb_circuitv1_proto_goTypes = []interface{}{ - (CircuitRelay_Status)(0), // 0: circuitv1.pb.CircuitRelay.Status - (CircuitRelay_Type)(0), // 1: circuitv1.pb.CircuitRelay.Type - (*CircuitRelay)(nil), // 2: circuitv1.pb.CircuitRelay - (*CircuitRelay_Peer)(nil), // 3: circuitv1.pb.CircuitRelay.Peer -} -var file_pb_circuitv1_proto_depIdxs = []int32{ - 1, // 0: circuitv1.pb.CircuitRelay.type:type_name -> circuitv1.pb.CircuitRelay.Type - 3, // 1: circuitv1.pb.CircuitRelay.srcPeer:type_name -> circuitv1.pb.CircuitRelay.Peer - 3, // 2: circuitv1.pb.CircuitRelay.dstPeer:type_name -> circuitv1.pb.CircuitRelay.Peer - 0, // 3: circuitv1.pb.CircuitRelay.code:type_name -> circuitv1.pb.CircuitRelay.Status - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name -} - -func init() { file_pb_circuitv1_proto_init() } -func file_pb_circuitv1_proto_init() { - if File_pb_circuitv1_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_pb_circuitv1_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CircuitRelay); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pb_circuitv1_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CircuitRelay_Peer); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_pb_circuitv1_proto_rawDesc, - NumEnums: 2, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_pb_circuitv1_proto_goTypes, - DependencyIndexes: file_pb_circuitv1_proto_depIdxs, - EnumInfos: file_pb_circuitv1_proto_enumTypes, - MessageInfos: file_pb_circuitv1_proto_msgTypes, - }.Build() - File_pb_circuitv1_proto = out.File - file_pb_circuitv1_proto_rawDesc = nil - file_pb_circuitv1_proto_goTypes = nil - file_pb_circuitv1_proto_depIdxs = nil -} diff --git a/p2p/protocol/circuitv1/pb/circuitv1.proto b/p2p/protocol/circuitv1/pb/circuitv1.proto deleted file mode 100644 index c591f0751a..0000000000 --- a/p2p/protocol/circuitv1/pb/circuitv1.proto +++ /dev/null @@ -1,44 +0,0 @@ -syntax = "proto2"; - -package circuitv1.pb; - -message CircuitRelay { - - enum Status { - SUCCESS = 100; - HOP_SRC_ADDR_TOO_LONG = 220; - HOP_DST_ADDR_TOO_LONG = 221; - HOP_SRC_MULTIADDR_INVALID = 250; - HOP_DST_MULTIADDR_INVALID = 251; - HOP_NO_CONN_TO_DST = 260; - HOP_CANT_DIAL_DST = 261; - HOP_CANT_OPEN_DST_STREAM = 262; - HOP_CANT_SPEAK_RELAY = 270; - HOP_CANT_RELAY_TO_SELF = 280; - STOP_SRC_ADDR_TOO_LONG = 320; - STOP_DST_ADDR_TOO_LONG = 321; - STOP_SRC_MULTIADDR_INVALID = 350; - STOP_DST_MULTIADDR_INVALID = 351; - STOP_RELAY_REFUSED = 390; - MALFORMED_MESSAGE = 400; - } - - enum Type { // RPC identifier, either HOP, STOP or STATUS - HOP = 1; - STOP = 2; - STATUS = 3; - CAN_HOP = 4; - } - - message Peer { - required bytes id = 1; // peer id - repeated bytes addrs = 2; // peer's known addresses - } - - optional Type type = 1; // Type of the message - - optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STOP - optional Peer dstPeer = 3; - - optional Status code = 4; // Status code, used when Type is STATUS -} diff --git a/p2p/protocol/circuitv1/proto.go b/p2p/protocol/circuitv1/proto.go deleted file mode 100644 index 9d2f0b2a69..0000000000 --- a/p2p/protocol/circuitv1/proto.go +++ /dev/null @@ -1,3 +0,0 @@ -package circuitv1 - -//go:generate protoc --proto_path=$PWD:$PWD/../../.. --go_out=. --go_opt=Mpb/circuitv1.proto=./pb pb/circuitv1.proto diff --git a/p2p/protocol/circuitv1/relay/options.go b/p2p/protocol/circuitv1/relay/options.go deleted file mode 100644 index bfd2ed895f..0000000000 --- a/p2p/protocol/circuitv1/relay/options.go +++ /dev/null @@ -1,46 +0,0 @@ -package relay - -import ( - "github.com/libp2p/go-libp2p/core/peer" -) - -type Resources struct { - // MaxCircuits is the maximum number of active relay connections - MaxCircuits int - - // MaxCircuitsPerPeer is the maximum number of active relay connections per peer - MaxCircuitsPerPeer int - - // BufferSize is the buffer size for relaying in each direction - BufferSize int -} - -func DefaultResources() Resources { - return Resources{ - MaxCircuits: 1024, - MaxCircuitsPerPeer: 64, - BufferSize: 4096, - } -} - -type ACLFilter interface { - AllowHop(src, dest peer.ID) bool -} - -type Option func(r *Relay) error - -// WithResources specifies resource limits for the relay -func WithResources(rc Resources) Option { - return func(r *Relay) error { - r.rc = rc - return nil - } -} - -// WithACL specifies an ACLFilter for access control -func WithACL(acl ACLFilter) Option { - return func(r *Relay) error { - r.acl = acl - return nil - } -} diff --git a/p2p/protocol/circuitv1/relay/relay.go b/p2p/protocol/circuitv1/relay/relay.go deleted file mode 100644 index 6e6e5b03cb..0000000000 --- a/p2p/protocol/circuitv1/relay/relay.go +++ /dev/null @@ -1,452 +0,0 @@ -package relay - -import ( - "context" - "fmt" - "io" - "sync" - "sync/atomic" - "time" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - pb "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/pb" - "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util" - - logging "github.com/ipfs/go-log/v2" - pool "github.com/libp2p/go-buffer-pool" - ma "github.com/multiformats/go-multiaddr" -) - -var log = logging.Logger("relay") - -const ( - ProtoID = "/libp2p/circuit/relay/0.1.0" - - ServiceName = "libp2p.relay/v1" - - StreamTimeout = time.Minute - ConnectTimeout = 30 * time.Second - HandshakeTimeout = time.Minute - - relayHopTag = "relay-v1-hop" - relayHopTagValue = 2 - - maxMessageSize = 4096 -) - -type Relay struct { - closed atomic.Bool - ctx context.Context - cancel context.CancelFunc - - host host.Host - rc Resources - acl ACLFilter - scope network.ResourceScopeSpan - - mx sync.Mutex - conns map[peer.ID]int - active int -} - -func NewRelay(h host.Host, opts ...Option) (*Relay, error) { - r := &Relay{ - host: h, - rc: DefaultResources(), - conns: make(map[peer.ID]int), - } - r.ctx, r.cancel = context.WithCancel(context.Background()) - - for _, opt := range opts { - err := opt(r) - if err != nil { - return nil, fmt.Errorf("error applying relay option: %w", err) - } - } - - // get a scope for memory reservations at service level - err := h.Network().ResourceManager().ViewService(ServiceName, - func(s network.ServiceScope) error { - var err error - r.scope, err = s.BeginSpan() - return err - }) - if err != nil { - return nil, err - } - - h.SetStreamHandler(ProtoID, r.handleStream) - - return r, nil -} - -func (r *Relay) Close() error { - if r.closed.CompareAndSwap(false, true) { - r.host.RemoveStreamHandler(ProtoID) - r.scope.Done() - r.cancel() - } - return nil -} - -func (r *Relay) handleStream(s network.Stream) { - log.Debugf("new relay stream from: %s", s.Conn().RemotePeer()) - - if err := s.Scope().SetService(ServiceName); err != nil { - log.Debugf("error attaching stream to relay service: %s", err) - s.Reset() - return - } - - if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil { - log.Debugf("error reserving memory for stream: %s", err) - s.Reset() - return - } - defer s.Scope().ReleaseMemory(maxMessageSize) - - rd := util.NewDelimitedReader(s, maxMessageSize) - defer rd.Close() - - s.SetReadDeadline(time.Now().Add(StreamTimeout)) - - var msg pb.CircuitRelay - - err := rd.ReadMsg(&msg) - if err != nil { - r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE) - return - } - s.SetReadDeadline(time.Time{}) - - switch msg.GetType() { - case pb.CircuitRelay_HOP: - r.handleHopStream(s, &msg) - case pb.CircuitRelay_CAN_HOP: - r.handleCanHop(s, &msg) - case pb.CircuitRelay_STOP: - r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) - default: - log.Warnf("unexpected relay handshake: %d", msg.GetType()) - r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE) - } -} - -func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) { - span, err := r.scope.BeginSpan() - if err != nil { - log.Debugf("failed to begin relay transaction: %s", err) - r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - } - - fail := func(code pb.CircuitRelay_Status) { - span.Done() - r.handleError(s, code) - } - - // reserve buffers for the relay - if err := span.ReserveMemory(2*r.rc.BufferSize, network.ReservationPriorityHigh); err != nil { - log.Debugf("error reserving memory for relay: %s", err) - fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - } - - src, err := peerToPeerInfo(msg.GetSrcPeer()) - if err != nil { - fail(pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID) - return - } - - if src.ID != s.Conn().RemotePeer() { - fail(pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID) - return - } - - dest, err := peerToPeerInfo(msg.GetDstPeer()) - if err != nil { - fail(pb.CircuitRelay_HOP_DST_MULTIADDR_INVALID) - return - } - - if dest.ID == r.host.ID() { - fail(pb.CircuitRelay_HOP_CANT_RELAY_TO_SELF) - return - } - - if r.acl != nil && !r.acl.AllowHop(src.ID, dest.ID) { - log.Debugf("refusing hop from %s to %s; ACL refused", src.ID, dest.ID) - fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - } - - r.mx.Lock() - if r.active >= r.rc.MaxCircuits { - r.mx.Unlock() - log.Debugf("refusing connection from %s to %s; too many active circuits", src.ID, dest.ID) - fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - } - - srcConns := r.conns[src.ID] - if srcConns >= r.rc.MaxCircuitsPerPeer { - r.mx.Unlock() - log.Debugf("refusing connection from %s to %s; too many connections from %s", src.ID, dest.ID, src) - fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - } - - destConns := r.conns[dest.ID] - if destConns >= r.rc.MaxCircuitsPerPeer { - r.mx.Unlock() - log.Debugf("refusing connection from %s to %s; too many connecitons to %s", src.ID, dest.ID, dest.ID) - fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - } - - r.active++ - r.addConn(src.ID) - r.addConn(src.ID) - r.mx.Unlock() - - cleanup := func() { - span.Done() - r.mx.Lock() - r.active-- - r.rmConn(src.ID) - r.rmConn(dest.ID) - r.mx.Unlock() - } - - // open stream - ctx, cancel := context.WithTimeout(r.ctx, ConnectTimeout) - defer cancel() - - ctx = network.WithNoDial(ctx, "relay hop") - bs, err := r.host.NewStream(ctx, dest.ID, ProtoID) - if err != nil { - log.Debugf("error opening relay stream to %s: %s", dest.ID.Pretty(), err.Error()) - if err == network.ErrNoConn { - r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST) - } else { - r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST) - } - cleanup() - return - } - - fail = func(code pb.CircuitRelay_Status) { - bs.Reset() - cleanup() - r.handleError(s, code) - } - - if err := bs.Scope().SetService(ServiceName); err != nil { - log.Debugf("error attaching stream to relay service: %s", err) - fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - } - - // stop handshake - if err := bs.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil { - log.Debugf("failed to reserve memory for stream: %s", err) - fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - } - defer bs.Scope().ReleaseMemory(maxMessageSize) - - rd := util.NewDelimitedReader(bs, maxMessageSize) - wr := util.NewDelimitedWriter(bs) - defer rd.Close() - - // set handshake deadline - bs.SetDeadline(time.Now().Add(HandshakeTimeout)) - - msg.Type = pb.CircuitRelay_STOP.Enum() - - err = wr.WriteMsg(msg) - if err != nil { - log.Debugf("error writing stop handshake: %s", err.Error()) - fail(pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) - return - } - - msg.Reset() - - err = rd.ReadMsg(msg) - if err != nil { - log.Debugf("error reading stop response: %s", err.Error()) - fail(pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) - return - } - - if msg.GetType() != pb.CircuitRelay_STATUS { - log.Debugf("unexpected relay stop response: not a status message (%d)", msg.GetType()) - fail(pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) - return - } - - if msg.GetCode() != pb.CircuitRelay_SUCCESS { - log.Debugf("relay stop failure: %d", msg.GetCode()) - fail(msg.GetCode()) - return - } - - err = r.writeResponse(s, pb.CircuitRelay_SUCCESS) - if err != nil { - log.Debugf("error writing relay response: %s", err.Error()) - bs.Reset() - s.Reset() - cleanup() - return - } - - // relay connection - log.Infof("relaying connection between %s and %s", src.ID.Pretty(), dest.ID.Pretty()) - - // reset deadline - bs.SetDeadline(time.Time{}) - - var goroutines atomic.Int32 - goroutines.Store(2) - done := func() { - if goroutines.Add(-1) == 0 { - s.Close() - bs.Close() - cleanup() - } - } - - go r.relayConn(s, bs, src.ID, dest.ID, done) - go r.relayConn(bs, s, dest.ID, src.ID, done) -} - -func (r *Relay) addConn(p peer.ID) { - conns := r.conns[p] - conns++ - r.conns[p] = conns - if conns == 1 { - r.host.ConnManager().TagPeer(p, relayHopTag, relayHopTagValue) - } -} - -func (r *Relay) rmConn(p peer.ID) { - conns := r.conns[p] - conns-- - if conns > 0 { - r.conns[p] = conns - } else { - delete(r.conns, p) - r.host.ConnManager().UntagPeer(p, relayHopTag) - } -} - -func (r *Relay) relayConn(src, dest network.Stream, srcID, destID peer.ID, done func()) { - defer done() - - buf := pool.Get(r.rc.BufferSize) - defer pool.Put(buf) - - count, err := io.CopyBuffer(dest, src, buf) - if err != nil { - log.Debugf("relay copy error: %s", err) - // Reset both. - src.Reset() - dest.Reset() - } else { - // propagate the close - dest.CloseWrite() - } - - log.Debugf("relayed %d bytes from %s to %s", count, srcID, destID) -} - -func (r *Relay) handleCanHop(s network.Stream, msg *pb.CircuitRelay) { - err := r.writeResponse(s, pb.CircuitRelay_SUCCESS) - - if err != nil { - s.Reset() - log.Debugf("error writing relay response: %s", err.Error()) - } else { - s.Close() - } -} - -func (r *Relay) handleError(s network.Stream, code pb.CircuitRelay_Status) { - log.Warnf("relay error: %s", code) - err := r.writeResponse(s, code) - if err != nil { - s.Reset() - log.Debugf("error writing relay response: %s", err.Error()) - } else { - s.Close() - } -} - -// Queries a peer for support of hop relay -func CanHop(ctx context.Context, host host.Host, id peer.ID) (bool, error) { - s, err := host.NewStream(ctx, id, ProtoID) - if err != nil { - return false, err - } - defer s.Close() - - rd := util.NewDelimitedReader(s, maxMessageSize) - wr := util.NewDelimitedWriter(s) - defer rd.Close() - - var msg pb.CircuitRelay - - msg.Type = pb.CircuitRelay_CAN_HOP.Enum() - - if err := wr.WriteMsg(&msg); err != nil { - s.Reset() - return false, err - } - - msg.Reset() - - if err := rd.ReadMsg(&msg); err != nil { - s.Reset() - return false, err - } - - if msg.GetType() != pb.CircuitRelay_STATUS { - return false, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType()) - } - - return msg.GetCode() == pb.CircuitRelay_SUCCESS, nil -} - -func (r *Relay) writeResponse(s network.Stream, code pb.CircuitRelay_Status) error { - wr := util.NewDelimitedWriter(s) - - var msg pb.CircuitRelay - msg.Type = pb.CircuitRelay_STATUS.Enum() - msg.Code = code.Enum() - - return wr.WriteMsg(&msg) -} - -func peerToPeerInfo(p *pb.CircuitRelay_Peer) (peer.AddrInfo, error) { - if p == nil { - return peer.AddrInfo{}, fmt.Errorf("nil peer") - } - - id, err := peer.IDFromBytes(p.Id) - if err != nil { - return peer.AddrInfo{}, err - } - - addrs := make([]ma.Multiaddr, 0, len(p.Addrs)) - for _, addrBytes := range p.Addrs { - a, err := ma.NewMultiaddrBytes(addrBytes) - if err == nil { - addrs = append(addrs, a) - } - } - - return peer.AddrInfo{ID: id, Addrs: addrs}, nil -} diff --git a/p2p/protocol/circuitv2/client/client.go b/p2p/protocol/circuitv2/client/client.go index aa302e7e16..c22436bca9 100644 --- a/p2p/protocol/circuitv2/client/client.go +++ b/p2p/protocol/circuitv2/client/client.go @@ -66,13 +66,11 @@ func New(h host.Host, upgrader transport.Upgrader) (*Client, error) { // Start registers the circuit (client) protocol stream handlers func (c *Client) Start() { - c.host.SetStreamHandler(proto.ProtoIDv1, c.handleStreamV1) c.host.SetStreamHandler(proto.ProtoIDv2Stop, c.handleStreamV2) } func (c *Client) Close() error { c.ctxCancel() - c.host.RemoveStreamHandler(proto.ProtoIDv1) c.host.RemoveStreamHandler(proto.ProtoIDv2Stop) return nil } diff --git a/p2p/protocol/circuitv2/client/dial.go b/p2p/protocol/circuitv2/client/dial.go index 2e5fc73b5d..ecf5d3a51a 100644 --- a/p2p/protocol/circuitv2/client/dial.go +++ b/p2p/protocol/circuitv2/client/dial.go @@ -8,7 +8,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - pbv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/pb" pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util" @@ -124,25 +123,14 @@ func (c *Client) dialPeer(ctx context.Context, relay, dest peer.AddrInfo) (*Conn dialCtx, cancel := context.WithTimeout(ctx, DialRelayTimeout) defer cancel() - s, err := c.host.NewStream(dialCtx, relay.ID, proto.ProtoIDv2Hop, proto.ProtoIDv1) + s, err := c.host.NewStream(dialCtx, relay.ID, proto.ProtoIDv2Hop) if err != nil { return nil, fmt.Errorf("error opening hop stream to relay: %w", err) } - - switch s.Protocol() { - case proto.ProtoIDv2Hop: - return c.connectV2(s, dest) - - case proto.ProtoIDv1: - return c.connectV1(s, dest) - - default: - s.Reset() - return nil, fmt.Errorf("unexpected stream protocol: %s", s.Protocol()) - } + return c.connect(s, dest) } -func (c *Client) connectV2(s network.Stream, dest peer.AddrInfo) (*Conn, error) { +func (c *Client) connect(s network.Stream, dest peer.AddrInfo) (*Conn, error) { if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil { s.Reset() return nil, err @@ -199,52 +187,3 @@ func (c *Client) connectV2(s network.Stream, dest peer.AddrInfo) (*Conn, error) return &Conn{stream: s, remote: dest, stat: stat, client: c}, nil } - -func (c *Client) connectV1(s network.Stream, dest peer.AddrInfo) (*Conn, error) { - if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil { - s.Reset() - return nil, err - } - defer s.Scope().ReleaseMemory(maxMessageSize) - - rd := util.NewDelimitedReader(s, maxMessageSize) - wr := util.NewDelimitedWriter(s) - defer rd.Close() - - var msg pbv1.CircuitRelay - - msg.Type = pbv1.CircuitRelay_HOP.Enum() - msg.SrcPeer = util.PeerInfoToPeerV1(c.host.Peerstore().PeerInfo(c.host.ID())) - msg.DstPeer = util.PeerInfoToPeerV1(dest) - - s.SetDeadline(time.Now().Add(DialTimeout)) - - err := wr.WriteMsg(&msg) - if err != nil { - s.Reset() - return nil, err - } - - msg.Reset() - - err = rd.ReadMsg(&msg) - if err != nil { - s.Reset() - return nil, err - } - - s.SetDeadline(time.Time{}) - - if msg.GetType() != pbv1.CircuitRelay_STATUS { - s.Reset() - return nil, newRelayError("unexpected relay response; not a status message (%d)", msg.GetType()) - } - - status := msg.GetCode() - if status != pbv1.CircuitRelay_SUCCESS { - s.Reset() - return nil, newRelayError("error opening relay circuit: %s (%d)", pbv1.CircuitRelay_Status_name[int32(status)], status) - } - - return &Conn{stream: s, remote: dest, client: c}, nil -} diff --git a/p2p/protocol/circuitv2/client/handlers.go b/p2p/protocol/circuitv2/client/handlers.go index ef50b8e826..6b5361b123 100644 --- a/p2p/protocol/circuitv2/client/handlers.go +++ b/p2p/protocol/circuitv2/client/handlers.go @@ -4,7 +4,6 @@ import ( "time" "github.com/libp2p/go-libp2p/core/network" - pbv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/pb" pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util" ) @@ -87,85 +86,3 @@ func (c *Client) handleStreamV2(s network.Stream) { handleError(pbv2.Status_CONNECTION_FAILED) } } - -func (c *Client) handleStreamV1(s network.Stream) { - log.Debugf("new relay/v1 stream from: %s", s.Conn().RemotePeer()) - - s.SetReadDeadline(time.Now().Add(StreamTimeout)) - - rd := util.NewDelimitedReader(s, maxMessageSize) - defer rd.Close() - - writeResponse := func(status pbv1.CircuitRelay_Status) error { - wr := util.NewDelimitedWriter(s) - - var msg pbv1.CircuitRelay - msg.Type = pbv1.CircuitRelay_STATUS.Enum() - msg.Code = status.Enum() - - return wr.WriteMsg(&msg) - } - - handleError := func(status pbv1.CircuitRelay_Status) { - log.Debugf("protocol error: %s (%d)", pbv1.CircuitRelay_Status_name[int32(status)], status) - err := writeResponse(status) - if err != nil { - s.Reset() - log.Debugf("error writing circuit response: %s", err.Error()) - } else { - s.Close() - } - } - - var msg pbv1.CircuitRelay - - err := rd.ReadMsg(&msg) - if err != nil { - handleError(pbv1.CircuitRelay_MALFORMED_MESSAGE) - return - } - // reset stream deadline as message has been read - s.SetReadDeadline(time.Time{}) - - switch msg.GetType() { - case pbv1.CircuitRelay_STOP: - - case pbv1.CircuitRelay_HOP: - handleError(pbv1.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - - case pbv1.CircuitRelay_CAN_HOP: - handleError(pbv1.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - - default: - log.Debugf("unexpected relay handshake: %d", msg.GetType()) - handleError(pbv1.CircuitRelay_MALFORMED_MESSAGE) - return - } - - src, err := util.PeerToPeerInfoV1(msg.GetSrcPeer()) - if err != nil { - handleError(pbv1.CircuitRelay_STOP_SRC_MULTIADDR_INVALID) - return - } - - dst, err := util.PeerToPeerInfoV1(msg.GetDstPeer()) - if err != nil || dst.ID != c.host.ID() { - handleError(pbv1.CircuitRelay_STOP_DST_MULTIADDR_INVALID) - return - } - - log.Debugf("incoming relay connection from: %s", src.ID) - - select { - case c.incoming <- accept{ - conn: &Conn{stream: s, remote: src, client: c}, - writeResponse: func() error { - return writeResponse(pbv1.CircuitRelay_SUCCESS) - }, - }: - case <-time.After(AcceptTimeout): - handleError(pbv1.CircuitRelay_STOP_RELAY_REFUSED) - } -} diff --git a/p2p/protocol/circuitv2/proto/protocol.go b/p2p/protocol/circuitv2/proto/protocol.go index d27fc50986..4b6d96b887 100644 --- a/p2p/protocol/circuitv2/proto/protocol.go +++ b/p2p/protocol/circuitv2/proto/protocol.go @@ -1,7 +1,6 @@ package proto const ( - ProtoIDv1 = "/libp2p/circuit/relay/0.1.0" ProtoIDv2Hop = "/libp2p/circuit/relay/0.2.0/hop" ProtoIDv2Stop = "/libp2p/circuit/relay/0.2.0/stop" ) diff --git a/p2p/protocol/circuitv2/relay/compat_test.go b/p2p/protocol/circuitv2/relay/compat_test.go deleted file mode 100644 index 693803f598..0000000000 --- a/p2p/protocol/circuitv2/relay/compat_test.go +++ /dev/null @@ -1,179 +0,0 @@ -package relay_test - -import ( - "bytes" - "context" - "fmt" - "io" - "testing" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/transport" - relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" - compatv1 "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated" - - ma "github.com/multiformats/go-multiaddr" -) - -func addTransportV1(t *testing.T, h host.Host, upgrader transport.Upgrader) { - err := compatv1.AddRelayTransport(h, upgrader) - if err != nil { - t.Fatal(err) - } -} - -func TestRelayCompatV2DialV1(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts, upgraders := getNetHosts(t, ctx, 3) - addTransportV1(t, hosts[0], upgraders[0]) - addTransport(t, hosts[2], upgraders[2]) - - rch := make(chan []byte, 1) - hosts[0].SetStreamHandler("test", func(s network.Stream) { - defer s.Close() - defer close(rch) - - buf := make([]byte, 1024) - nread := 0 - for nread < len(buf) { - n, err := s.Read(buf[nread:]) - nread += n - if err != nil { - if err == io.EOF { - break - } - t.Fatal(err) - } - } - - rch <- buf[:nread] - }) - - r, err := relayv1.NewRelay(hosts[1]) - if err != nil { - t.Fatal(err) - } - defer r.Close() - - connect(t, hosts[0], hosts[1]) - connect(t, hosts[1], hosts[2]) - - raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID())) - if err != nil { - t.Fatal(err) - } - - err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}}) - if err != nil { - t.Fatal(err) - } - - conns := hosts[2].Network().ConnsToPeer(hosts[0].ID()) - if len(conns) != 1 { - t.Fatalf("expected 1 connection, but got %d", len(conns)) - } - if conns[0].Stat().Transient { - t.Fatal("expected non transient connection") - } - - s, err := hosts[2].NewStream(ctx, hosts[0].ID(), "test") - if err != nil { - t.Fatal(err) - } - - msg := []byte("relay works!") - nwritten, err := s.Write(msg) - if err != nil { - t.Fatal(err) - } - if nwritten != len(msg) { - t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten) - } - s.CloseWrite() - - got := <-rch - if !bytes.Equal(msg, got) { - t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got)) - } -} - -func TestRelayCompatV1DialV2(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts, upgraders := getNetHosts(t, ctx, 3) - addTransport(t, hosts[0], upgraders[0]) - addTransportV1(t, hosts[2], upgraders[2]) - - rch := make(chan []byte, 1) - hosts[0].SetStreamHandler("test", func(s network.Stream) { - defer s.Close() - defer close(rch) - - buf := make([]byte, 1024) - nread := 0 - for nread < len(buf) { - n, err := s.Read(buf[nread:]) - nread += n - if err != nil { - if err == io.EOF { - break - } - t.Fatal(err) - } - } - - rch <- buf[:nread] - }) - - r, err := relayv1.NewRelay(hosts[1]) - if err != nil { - t.Fatal(err) - } - defer r.Close() - - connect(t, hosts[0], hosts[1]) - connect(t, hosts[1], hosts[2]) - - raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID())) - if err != nil { - t.Fatal(err) - } - - err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}}) - if err != nil { - t.Fatal(err) - } - - conns := hosts[2].Network().ConnsToPeer(hosts[0].ID()) - if len(conns) != 1 { - t.Fatalf("expected 1 connection, but got %d", len(conns)) - } - if conns[0].Stat().Transient { - t.Fatal("expected non transient connection") - } - - s, err := hosts[2].NewStream(ctx, hosts[0].ID(), "test") - if err != nil { - t.Fatal(err) - } - - msg := []byte("relay works!") - nwritten, err := s.Write(msg) - if err != nil { - t.Fatal(err) - } - if nwritten != len(msg) { - t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten) - } - s.CloseWrite() - - got := <-rch - if !bytes.Equal(msg, got) { - t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got)) - } -} diff --git a/p2p/protocol/circuitv2/util/pbconv.go b/p2p/protocol/circuitv2/util/pbconv.go index 4a884351ee..f5b72bf05b 100644 --- a/p2p/protocol/circuitv2/util/pbconv.go +++ b/p2p/protocol/circuitv2/util/pbconv.go @@ -4,51 +4,11 @@ import ( "errors" "github.com/libp2p/go-libp2p/core/peer" - pbv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/pb" pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb" ma "github.com/multiformats/go-multiaddr" ) -func PeerToPeerInfoV1(p *pbv1.CircuitRelay_Peer) (peer.AddrInfo, error) { - if p == nil { - return peer.AddrInfo{}, errors.New("nil peer") - } - - id, err := peer.IDFromBytes(p.Id) - if err != nil { - return peer.AddrInfo{}, err - } - - var addrs []ma.Multiaddr - if len(p.Addrs) > 0 { - addrs = make([]ma.Multiaddr, 0, len(p.Addrs)) - } - - for _, addrBytes := range p.Addrs { - a, err := ma.NewMultiaddrBytes(addrBytes) - if err == nil { - addrs = append(addrs, a) - } - } - - return peer.AddrInfo{ID: id, Addrs: addrs}, nil -} - -func PeerInfoToPeerV1(pi peer.AddrInfo) *pbv1.CircuitRelay_Peer { - addrs := make([][]byte, 0, len(pi.Addrs)) - - for _, addr := range pi.Addrs { - addrs = append(addrs, addr.Bytes()) - } - - p := new(pbv1.CircuitRelay_Peer) - p.Id = []byte(pi.ID) - p.Addrs = addrs - - return p -} - func PeerToPeerInfoV2(p *pbv2.Peer) (peer.AddrInfo, error) { if p == nil { return peer.AddrInfo{}, errors.New("nil peer") @@ -73,14 +33,12 @@ func PeerToPeerInfoV2(p *pbv2.Peer) (peer.AddrInfo, error) { func PeerInfoToPeerV2(pi peer.AddrInfo) *pbv2.Peer { addrs := make([][]byte, 0, len(pi.Addrs)) - for _, addr := range pi.Addrs { addrs = append(addrs, addr.Bytes()) } - p := new(pbv2.Peer) - p.Id = []byte(pi.ID) - p.Addrs = addrs - - return p + return &pbv2.Peer{ + Id: []byte(pi.ID), + Addrs: addrs, + } } diff --git a/p2p/protocol/holepunch/holepunch_test.go b/p2p/protocol/holepunch/holepunch_test.go index c365927ce8..9cf57ff65c 100644 --- a/p2p/protocol/holepunch/holepunch_test.go +++ b/p2p/protocol/holepunch/holepunch_test.go @@ -7,21 +7,19 @@ import ( "testing" "time" - "github.com/libp2p/go-libp2p/p2p/transport/tcp" - - "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" - "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-testing/race" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/libp2p/go-libp2p/p2p/host/autorelay" - relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" + "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" + relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" holepunch_pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "github.com/libp2p/go-libp2p/p2p/transport/tcp" + "github.com/libp2p/go-msgio/pbio" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -338,7 +336,7 @@ func TestFailuresOnResponder(t *testing.T) { defer h2.Close() defer relay.Close() - s, err := h2.NewStream(context.Background(), h1.ID(), holepunch.Protocol) + s, err := h2.NewStream(network.WithUseTransient(context.Background(), "holepunch"), h1.ID(), holepunch.Protocol) require.NoError(t, err) go tc.initiator(s) @@ -423,10 +421,7 @@ func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host { h, err := libp2p.New( libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0")), libp2p.EnableRelay(), - libp2p.EnableAutoRelayWithStaticRelays( - []peer.AddrInfo{pi}, - autorelay.WithCircuitV1Support(), - ), + libp2p.EnableAutoRelayWithStaticRelays([]peer.AddrInfo{pi}), libp2p.ForceReachabilityPrivate(), libp2p.ResourceManager(&network.NullResourceManager{}), ) @@ -454,7 +449,7 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc libp2p.ResourceManager(&network.NullResourceManager{}), ) require.NoError(t, err) - _, err = relayv1.NewRelay(relay) + _, err = relayv2.New(relay) require.NoError(t, err) // make sure the relay service is started and advertised by Identify @@ -467,7 +462,7 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc defer h.Close() require.NoError(t, h.Connect(context.Background(), peer.AddrInfo{ID: relay.ID(), Addrs: relay.Addrs()})) require.Eventually(t, func() bool { - supported, err := h.Peerstore().SupportsProtocols(relay.ID(), proto.ProtoIDv2Hop, relayv1.ProtoID) + supported, err := h.Peerstore().SupportsProtocols(relay.ID(), proto.ProtoIDv2Hop) return err == nil && len(supported) > 0 }, 3*time.Second, 100*time.Millisecond) diff --git a/p2p/protocol/internal/circuitv1-deprecated/conn.go b/p2p/protocol/internal/circuitv1-deprecated/conn.go deleted file mode 100644 index 19d2964e38..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/conn.go +++ /dev/null @@ -1,124 +0,0 @@ -package relay - -import ( - "fmt" - "net" - "time" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - - ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" -) - -// HopTagWeight is the connection manager weight for connections carrying relay hop streams -var HopTagWeight = 5 - -type Conn struct { - stream network.Stream - remote peer.AddrInfo - host host.Host - relay *Relay -} - -type NetAddr struct { - Relay string - Remote string -} - -func (n *NetAddr) Network() string { - return "libp2p-circuit-relay" -} - -func (n *NetAddr) String() string { - return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay) -} - -func (c *Conn) Close() error { - c.untagHop() - return c.stream.Reset() -} - -func (c *Conn) Read(buf []byte) (int, error) { - return c.stream.Read(buf) -} - -func (c *Conn) Write(buf []byte) (int, error) { - return c.stream.Write(buf) -} - -func (c *Conn) SetDeadline(t time.Time) error { - return c.stream.SetDeadline(t) -} - -func (c *Conn) SetReadDeadline(t time.Time) error { - return c.stream.SetReadDeadline(t) -} - -func (c *Conn) SetWriteDeadline(t time.Time) error { - return c.stream.SetWriteDeadline(t) -} - -func (c *Conn) RemoteAddr() net.Addr { - return &NetAddr{ - Relay: c.stream.Conn().RemotePeer().Pretty(), - Remote: c.remote.ID.Pretty(), - } -} - -// Increment the underlying relay connection tag by 1, thus increasing its protection from -// connection pruning. This ensures that connections to relays are not accidentally closed, -// by the connection manager, taking with them all the relayed connections (that may themselves -// be protected). -func (c *Conn) tagHop() { - c.relay.mx.Lock() - defer c.relay.mx.Unlock() - - p := c.stream.Conn().RemotePeer() - c.relay.hopCount[p]++ - if c.relay.hopCount[p] == 1 { - c.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight) - } -} - -// Decrement the underlying relay connection tag by 1; this is performed when we close the -// relayed connection. -func (c *Conn) untagHop() { - c.relay.mx.Lock() - defer c.relay.mx.Unlock() - - p := c.stream.Conn().RemotePeer() - c.relay.hopCount[p]-- - if c.relay.hopCount[p] == 0 { - c.host.ConnManager().UntagPeer(p, "relay-hop-stream") - delete(c.relay.hopCount, p) - } -} - -// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input" -func (c *Conn) RemoteMultiaddr() ma.Multiaddr { - // TODO: We should be able to do this directly without converting to/from a string. - relayAddr, err := ma.NewComponent( - ma.ProtocolWithCode(ma.P_P2P).Name, - c.stream.Conn().RemotePeer().Pretty(), - ) - if err != nil { - panic(err) - } - return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr) -} - -func (c *Conn) LocalMultiaddr() ma.Multiaddr { - return c.stream.Conn().LocalMultiaddr() -} - -func (c *Conn) LocalAddr() net.Addr { - na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr()) - if err != nil { - log.Error("failed to convert local multiaddr to net addr:", err) - return nil - } - return na -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/dial.go b/p2p/protocol/internal/circuitv1-deprecated/dial.go deleted file mode 100644 index 23c03b57ea..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/dial.go +++ /dev/null @@ -1,57 +0,0 @@ -package relay - -import ( - "context" - "fmt" - - "github.com/libp2p/go-libp2p/core/network" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/transport" - ma "github.com/multiformats/go-multiaddr" -) - -func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) { - c, err := d.Relay().Dial(ctx, a, p) - if err != nil { - return nil, err - } - c.tagHop() - scope, _ := (&network.NullResourceManager{}).OpenConnection(network.DirOutbound, false, a) - return d.upgrader.Upgrade(ctx, d, c, network.DirOutbound, p, scope) -} - -func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, error) { - // split /a/p2p-circuit/b into (/a, /p2p-circuit/b) - relayaddr, destaddr := ma.SplitFunc(a, func(c ma.Component) bool { - return c.Protocol().Code == ma.P_CIRCUIT - }) - - // If the address contained no /p2p-circuit part, the second part is nil. - if destaddr == nil { - return nil, fmt.Errorf("%s is not a relay address", a) - } - - if relayaddr == nil { - return nil, fmt.Errorf( - "can't dial a p2p-circuit without specifying a relay: %s", - a, - ) - } - - // Strip the /p2p-circuit prefix from the destaddr. - _, destaddr = ma.SplitFirst(destaddr) - - dinfo := &peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{}} - if destaddr != nil { - dinfo.Addrs = append(dinfo.Addrs, destaddr) - } - - var rinfo *peer.AddrInfo - rinfo, err := peer.AddrInfoFromP2pAddr(relayaddr) - if err != nil { - return nil, fmt.Errorf("error parsing multiaddr '%s': %s", relayaddr.String(), err) - } - - return r.DialPeer(ctx, *rinfo, *dinfo) -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/listen.go b/p2p/protocol/internal/circuitv1-deprecated/listen.go deleted file mode 100644 index 1d5353a022..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/listen.go +++ /dev/null @@ -1,61 +0,0 @@ -package relay - -import ( - "net" - - pb "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated/pb" - - ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" -) - -var _ manet.Listener = (*RelayListener)(nil) - -type RelayListener Relay - -func (l *RelayListener) Relay() *Relay { - return (*Relay)(l) -} - -func (r *Relay) Listener() *RelayListener { - // TODO: Only allow one! - return (*RelayListener)(r) -} - -func (l *RelayListener) Accept() (manet.Conn, error) { - for { - select { - case c := <-l.incoming: - err := l.Relay().writeResponse(c.stream, pb.CircuitRelay_SUCCESS) - if err != nil { - log.Debugf("error writing relay response: %s", err.Error()) - c.stream.Reset() - continue - } - - // TODO: Pretty print. - log.Infof("accepted relay connection: %q", c) - - c.tagHop() - return c, nil - case <-l.ctx.Done(): - return nil, l.ctx.Err() - } - } -} - -func (l *RelayListener) Addr() net.Addr { - return &NetAddr{ - Relay: "any", - Remote: "any", - } -} - -func (l *RelayListener) Multiaddr() ma.Multiaddr { - return circuitAddr -} - -func (l *RelayListener) Close() error { - // TODO: noop? - return nil -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go deleted file mode 100644 index e71263ca07..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go +++ /dev/null @@ -1,447 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.12 -// source: pb/relay.proto - -package pb - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type CircuitRelay_Status int32 - -const ( - CircuitRelay_SUCCESS CircuitRelay_Status = 100 - CircuitRelay_HOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 220 - CircuitRelay_HOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 221 - CircuitRelay_HOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 250 - CircuitRelay_HOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 251 - CircuitRelay_HOP_NO_CONN_TO_DST CircuitRelay_Status = 260 - CircuitRelay_HOP_CANT_DIAL_DST CircuitRelay_Status = 261 - CircuitRelay_HOP_CANT_OPEN_DST_STREAM CircuitRelay_Status = 262 - CircuitRelay_HOP_CANT_SPEAK_RELAY CircuitRelay_Status = 270 - CircuitRelay_HOP_CANT_RELAY_TO_SELF CircuitRelay_Status = 280 - CircuitRelay_STOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 320 - CircuitRelay_STOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 321 - CircuitRelay_STOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 350 - CircuitRelay_STOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 351 - CircuitRelay_STOP_RELAY_REFUSED CircuitRelay_Status = 390 - CircuitRelay_MALFORMED_MESSAGE CircuitRelay_Status = 400 -) - -// Enum value maps for CircuitRelay_Status. -var ( - CircuitRelay_Status_name = map[int32]string{ - 100: "SUCCESS", - 220: "HOP_SRC_ADDR_TOO_LONG", - 221: "HOP_DST_ADDR_TOO_LONG", - 250: "HOP_SRC_MULTIADDR_INVALID", - 251: "HOP_DST_MULTIADDR_INVALID", - 260: "HOP_NO_CONN_TO_DST", - 261: "HOP_CANT_DIAL_DST", - 262: "HOP_CANT_OPEN_DST_STREAM", - 270: "HOP_CANT_SPEAK_RELAY", - 280: "HOP_CANT_RELAY_TO_SELF", - 320: "STOP_SRC_ADDR_TOO_LONG", - 321: "STOP_DST_ADDR_TOO_LONG", - 350: "STOP_SRC_MULTIADDR_INVALID", - 351: "STOP_DST_MULTIADDR_INVALID", - 390: "STOP_RELAY_REFUSED", - 400: "MALFORMED_MESSAGE", - } - CircuitRelay_Status_value = map[string]int32{ - "SUCCESS": 100, - "HOP_SRC_ADDR_TOO_LONG": 220, - "HOP_DST_ADDR_TOO_LONG": 221, - "HOP_SRC_MULTIADDR_INVALID": 250, - "HOP_DST_MULTIADDR_INVALID": 251, - "HOP_NO_CONN_TO_DST": 260, - "HOP_CANT_DIAL_DST": 261, - "HOP_CANT_OPEN_DST_STREAM": 262, - "HOP_CANT_SPEAK_RELAY": 270, - "HOP_CANT_RELAY_TO_SELF": 280, - "STOP_SRC_ADDR_TOO_LONG": 320, - "STOP_DST_ADDR_TOO_LONG": 321, - "STOP_SRC_MULTIADDR_INVALID": 350, - "STOP_DST_MULTIADDR_INVALID": 351, - "STOP_RELAY_REFUSED": 390, - "MALFORMED_MESSAGE": 400, - } -) - -func (x CircuitRelay_Status) Enum() *CircuitRelay_Status { - p := new(CircuitRelay_Status) - *p = x - return p -} - -func (x CircuitRelay_Status) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (CircuitRelay_Status) Descriptor() protoreflect.EnumDescriptor { - return file_pb_relay_proto_enumTypes[0].Descriptor() -} - -func (CircuitRelay_Status) Type() protoreflect.EnumType { - return &file_pb_relay_proto_enumTypes[0] -} - -func (x CircuitRelay_Status) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Do not use. -func (x *CircuitRelay_Status) UnmarshalJSON(b []byte) error { - num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b) - if err != nil { - return err - } - *x = CircuitRelay_Status(num) - return nil -} - -// Deprecated: Use CircuitRelay_Status.Descriptor instead. -func (CircuitRelay_Status) EnumDescriptor() ([]byte, []int) { - return file_pb_relay_proto_rawDescGZIP(), []int{0, 0} -} - -type CircuitRelay_Type int32 - -const ( - CircuitRelay_HOP CircuitRelay_Type = 1 - CircuitRelay_STOP CircuitRelay_Type = 2 - CircuitRelay_STATUS CircuitRelay_Type = 3 - CircuitRelay_CAN_HOP CircuitRelay_Type = 4 -) - -// Enum value maps for CircuitRelay_Type. -var ( - CircuitRelay_Type_name = map[int32]string{ - 1: "HOP", - 2: "STOP", - 3: "STATUS", - 4: "CAN_HOP", - } - CircuitRelay_Type_value = map[string]int32{ - "HOP": 1, - "STOP": 2, - "STATUS": 3, - "CAN_HOP": 4, - } -) - -func (x CircuitRelay_Type) Enum() *CircuitRelay_Type { - p := new(CircuitRelay_Type) - *p = x - return p -} - -func (x CircuitRelay_Type) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (CircuitRelay_Type) Descriptor() protoreflect.EnumDescriptor { - return file_pb_relay_proto_enumTypes[1].Descriptor() -} - -func (CircuitRelay_Type) Type() protoreflect.EnumType { - return &file_pb_relay_proto_enumTypes[1] -} - -func (x CircuitRelay_Type) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Do not use. -func (x *CircuitRelay_Type) UnmarshalJSON(b []byte) error { - num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b) - if err != nil { - return err - } - *x = CircuitRelay_Type(num) - return nil -} - -// Deprecated: Use CircuitRelay_Type.Descriptor instead. -func (CircuitRelay_Type) EnumDescriptor() ([]byte, []int) { - return file_pb_relay_proto_rawDescGZIP(), []int{0, 1} -} - -type CircuitRelay struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Type *CircuitRelay_Type `protobuf:"varint,1,opt,name=type,enum=relay.pb.CircuitRelay_Type" json:"type,omitempty"` // Type of the message - SrcPeer *CircuitRelay_Peer `protobuf:"bytes,2,opt,name=srcPeer" json:"srcPeer,omitempty"` // srcPeer and dstPeer are used when Type is HOP or STOP - DstPeer *CircuitRelay_Peer `protobuf:"bytes,3,opt,name=dstPeer" json:"dstPeer,omitempty"` - Code *CircuitRelay_Status `protobuf:"varint,4,opt,name=code,enum=relay.pb.CircuitRelay_Status" json:"code,omitempty"` // Status code, used when Type is STATUS -} - -func (x *CircuitRelay) Reset() { - *x = CircuitRelay{} - if protoimpl.UnsafeEnabled { - mi := &file_pb_relay_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *CircuitRelay) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*CircuitRelay) ProtoMessage() {} - -func (x *CircuitRelay) ProtoReflect() protoreflect.Message { - mi := &file_pb_relay_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use CircuitRelay.ProtoReflect.Descriptor instead. -func (*CircuitRelay) Descriptor() ([]byte, []int) { - return file_pb_relay_proto_rawDescGZIP(), []int{0} -} - -func (x *CircuitRelay) GetType() CircuitRelay_Type { - if x != nil && x.Type != nil { - return *x.Type - } - return CircuitRelay_HOP -} - -func (x *CircuitRelay) GetSrcPeer() *CircuitRelay_Peer { - if x != nil { - return x.SrcPeer - } - return nil -} - -func (x *CircuitRelay) GetDstPeer() *CircuitRelay_Peer { - if x != nil { - return x.DstPeer - } - return nil -} - -func (x *CircuitRelay) GetCode() CircuitRelay_Status { - if x != nil && x.Code != nil { - return *x.Code - } - return CircuitRelay_SUCCESS -} - -type CircuitRelay_Peer struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Id []byte `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` // peer id - Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` // peer's known addresses -} - -func (x *CircuitRelay_Peer) Reset() { - *x = CircuitRelay_Peer{} - if protoimpl.UnsafeEnabled { - mi := &file_pb_relay_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *CircuitRelay_Peer) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*CircuitRelay_Peer) ProtoMessage() {} - -func (x *CircuitRelay_Peer) ProtoReflect() protoreflect.Message { - mi := &file_pb_relay_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use CircuitRelay_Peer.ProtoReflect.Descriptor instead. -func (*CircuitRelay_Peer) Descriptor() ([]byte, []int) { - return file_pb_relay_proto_rawDescGZIP(), []int{0, 0} -} - -func (x *CircuitRelay_Peer) GetId() []byte { - if x != nil { - return x.Id - } - return nil -} - -func (x *CircuitRelay_Peer) GetAddrs() [][]byte { - if x != nil { - return x.Addrs - } - return nil -} - -var File_pb_relay_proto protoreflect.FileDescriptor - -var file_pb_relay_proto_rawDesc = []byte{ - 0x0a, 0x0e, 0x70, 0x62, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x12, 0x08, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x62, 0x22, 0x87, 0x06, 0x0a, 0x0c, 0x43, - 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x2f, 0x0a, 0x04, 0x74, - 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b, 0x2e, 0x72, 0x65, 0x6c, 0x61, - 0x79, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, - 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x35, 0x0a, 0x07, - 0x73, 0x72, 0x63, 0x50, 0x65, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, - 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, - 0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50, - 0x65, 0x65, 0x72, 0x12, 0x35, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x62, 0x2e, - 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x65, 0x65, - 0x72, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x12, 0x31, 0x0a, 0x04, 0x63, 0x6f, - 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, - 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, - 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x1a, 0x2c, 0x0a, - 0x04, 0x50, 0x65, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x02, 0x28, - 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x64, 0x64, 0x72, 0x73, 0x18, 0x02, - 0x20, 0x03, 0x28, 0x0c, 0x52, 0x05, 0x61, 0x64, 0x64, 0x72, 0x73, 0x22, 0xc2, 0x03, 0x0a, 0x06, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, - 0x53, 0x10, 0x64, 0x12, 0x1a, 0x0a, 0x15, 0x48, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x41, - 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xdc, 0x01, 0x12, - 0x1a, 0x0a, 0x15, 0x48, 0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f, - 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xdd, 0x01, 0x12, 0x1e, 0x0a, 0x19, 0x48, - 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, - 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0xfa, 0x01, 0x12, 0x1e, 0x0a, 0x19, 0x48, - 0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, - 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0xfb, 0x01, 0x12, 0x17, 0x0a, 0x12, 0x48, - 0x4f, 0x50, 0x5f, 0x4e, 0x4f, 0x5f, 0x43, 0x4f, 0x4e, 0x4e, 0x5f, 0x54, 0x4f, 0x5f, 0x44, 0x53, - 0x54, 0x10, 0x84, 0x02, 0x12, 0x16, 0x0a, 0x11, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, - 0x5f, 0x44, 0x49, 0x41, 0x4c, 0x5f, 0x44, 0x53, 0x54, 0x10, 0x85, 0x02, 0x12, 0x1d, 0x0a, 0x18, - 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f, 0x4f, 0x50, 0x45, 0x4e, 0x5f, 0x44, 0x53, - 0x54, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x10, 0x86, 0x02, 0x12, 0x19, 0x0a, 0x14, 0x48, - 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f, 0x53, 0x50, 0x45, 0x41, 0x4b, 0x5f, 0x52, 0x45, - 0x4c, 0x41, 0x59, 0x10, 0x8e, 0x02, 0x12, 0x1b, 0x0a, 0x16, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, - 0x4e, 0x54, 0x5f, 0x52, 0x45, 0x4c, 0x41, 0x59, 0x5f, 0x54, 0x4f, 0x5f, 0x53, 0x45, 0x4c, 0x46, - 0x10, 0x98, 0x02, 0x12, 0x1b, 0x0a, 0x16, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, - 0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xc0, 0x02, - 0x12, 0x1b, 0x0a, 0x16, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x41, 0x44, 0x44, - 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xc1, 0x02, 0x12, 0x1f, 0x0a, - 0x1a, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41, - 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0xde, 0x02, 0x12, 0x1f, - 0x0a, 0x1a, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, - 0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0xdf, 0x02, 0x12, - 0x17, 0x0a, 0x12, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x52, 0x45, 0x4c, 0x41, 0x59, 0x5f, 0x52, 0x45, - 0x46, 0x55, 0x53, 0x45, 0x44, 0x10, 0x86, 0x03, 0x12, 0x16, 0x0a, 0x11, 0x4d, 0x41, 0x4c, 0x46, - 0x4f, 0x52, 0x4d, 0x45, 0x44, 0x5f, 0x4d, 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x90, 0x03, - 0x22, 0x32, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x48, 0x4f, 0x50, 0x10, - 0x01, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x54, 0x4f, 0x50, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x53, - 0x54, 0x41, 0x54, 0x55, 0x53, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x41, 0x4e, 0x5f, 0x48, - 0x4f, 0x50, 0x10, 0x04, -} - -var ( - file_pb_relay_proto_rawDescOnce sync.Once - file_pb_relay_proto_rawDescData = file_pb_relay_proto_rawDesc -) - -func file_pb_relay_proto_rawDescGZIP() []byte { - file_pb_relay_proto_rawDescOnce.Do(func() { - file_pb_relay_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_relay_proto_rawDescData) - }) - return file_pb_relay_proto_rawDescData -} - -var file_pb_relay_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_pb_relay_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_pb_relay_proto_goTypes = []interface{}{ - (CircuitRelay_Status)(0), // 0: relay.pb.CircuitRelay.Status - (CircuitRelay_Type)(0), // 1: relay.pb.CircuitRelay.Type - (*CircuitRelay)(nil), // 2: relay.pb.CircuitRelay - (*CircuitRelay_Peer)(nil), // 3: relay.pb.CircuitRelay.Peer -} -var file_pb_relay_proto_depIdxs = []int32{ - 1, // 0: relay.pb.CircuitRelay.type:type_name -> relay.pb.CircuitRelay.Type - 3, // 1: relay.pb.CircuitRelay.srcPeer:type_name -> relay.pb.CircuitRelay.Peer - 3, // 2: relay.pb.CircuitRelay.dstPeer:type_name -> relay.pb.CircuitRelay.Peer - 0, // 3: relay.pb.CircuitRelay.code:type_name -> relay.pb.CircuitRelay.Status - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name -} - -func init() { file_pb_relay_proto_init() } -func file_pb_relay_proto_init() { - if File_pb_relay_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_pb_relay_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CircuitRelay); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pb_relay_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CircuitRelay_Peer); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_pb_relay_proto_rawDesc, - NumEnums: 2, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_pb_relay_proto_goTypes, - DependencyIndexes: file_pb_relay_proto_depIdxs, - EnumInfos: file_pb_relay_proto_enumTypes, - MessageInfos: file_pb_relay_proto_msgTypes, - }.Build() - File_pb_relay_proto = out.File - file_pb_relay_proto_rawDesc = nil - file_pb_relay_proto_goTypes = nil - file_pb_relay_proto_depIdxs = nil -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto deleted file mode 100644 index de3e637b12..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto +++ /dev/null @@ -1,44 +0,0 @@ -syntax = "proto2"; - -package relay.pb; - -message CircuitRelay { - - enum Status { - SUCCESS = 100; - HOP_SRC_ADDR_TOO_LONG = 220; - HOP_DST_ADDR_TOO_LONG = 221; - HOP_SRC_MULTIADDR_INVALID = 250; - HOP_DST_MULTIADDR_INVALID = 251; - HOP_NO_CONN_TO_DST = 260; - HOP_CANT_DIAL_DST = 261; - HOP_CANT_OPEN_DST_STREAM = 262; - HOP_CANT_SPEAK_RELAY = 270; - HOP_CANT_RELAY_TO_SELF = 280; - STOP_SRC_ADDR_TOO_LONG = 320; - STOP_DST_ADDR_TOO_LONG = 321; - STOP_SRC_MULTIADDR_INVALID = 350; - STOP_DST_MULTIADDR_INVALID = 351; - STOP_RELAY_REFUSED = 390; - MALFORMED_MESSAGE = 400; - } - - enum Type { // RPC identifier, either HOP, STOP or STATUS - HOP = 1; - STOP = 2; - STATUS = 3; - CAN_HOP = 4; - } - - message Peer { - required bytes id = 1; // peer id - repeated bytes addrs = 2; // peer's known addresses - } - - optional Type type = 1; // Type of the message - - optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STOP - optional Peer dstPeer = 3; - - optional Status code = 4; // Status code, used when Type is STATUS -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go deleted file mode 100644 index 3d67bc07a5..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ /dev/null @@ -1,507 +0,0 @@ -package relay - -import ( - "context" - "fmt" - "io" - "sync" - "sync/atomic" - "time" - - pb "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated/pb" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/libp2p/go-libp2p/core/transport" - - pool "github.com/libp2p/go-buffer-pool" - - logging "github.com/ipfs/go-log/v2" - - ma "github.com/multiformats/go-multiaddr" -) - -var log = logging.Logger("relay") - -const ProtoID = "/libp2p/circuit/relay/0.1.0" - -const maxMessageSize = 4096 - -var ( - RelayAcceptTimeout = 10 * time.Second - HopConnectTimeout = 30 * time.Second - StopHandshakeTimeout = 1 * time.Minute - - HopStreamBufferSize = 4096 - HopStreamLimit = 1 << 19 // 512K hops for 1M goroutines - - streamTimeout = 1 * time.Minute -) - -// Relay is the relay transport and service. -type Relay struct { - host host.Host - upgrader transport.Upgrader - ctx context.Context - ctxCancel context.CancelFunc - self peer.ID - - active bool - hop bool - - incoming chan *Conn - - // atomic counters - streamCount atomic.Int32 - liveHopCount atomic.Int32 - - // per peer hop counters - mx sync.Mutex - hopCount map[peer.ID]int -} - -// RelayOpts are options for configuring the relay transport. -type RelayOpt int - -var ( - // OptActive configures the relay transport to actively establish - // outbound connections on behalf of clients. You probably don't want to - // enable this unless you know what you're doing. - OptActive = RelayOpt(0) - // OptHop configures the relay transport to accept requests to relay - // traffic on behalf of third-parties. Unless OptActive is specified, - // this will only relay traffic between peers already connected to this - // node. - OptHop = RelayOpt(1) - // OptDiscovery is a no-op. It was introduced as a way to probe new - // peers to see if they were willing to act as a relays. However, in - // practice, it's useless. While it does test to see if these peers are - // relays, it doesn't (and can't), check to see if these peers are - // _active_ relays (i.e., will actively dial the target peer). - // - // This option may be re-enabled in the future but for now you shouldn't - // use it. - OptDiscovery = RelayOpt(2) -) - -type RelayError struct { - Code pb.CircuitRelay_Status -} - -func (e RelayError) Error() string { - return fmt.Sprintf("error opening relay circuit: %s (%d)", pb.CircuitRelay_Status_name[int32(e.Code)], e.Code) -} - -// NewRelay constructs a new relay. -func NewRelay(h host.Host, upgrader transport.Upgrader, opts ...RelayOpt) (*Relay, error) { - r := &Relay{ - upgrader: upgrader, - host: h, - self: h.ID(), - incoming: make(chan *Conn), - hopCount: make(map[peer.ID]int), - } - r.ctx, r.ctxCancel = context.WithCancel(context.Background()) - - for _, opt := range opts { - switch opt { - case OptActive: - r.active = true - case OptHop: - r.hop = true - case OptDiscovery: - log.Errorf( - "circuit.OptDiscovery is now a no-op: %s", - "dialing peers with a random relay is no longer supported", - ) - default: - return nil, fmt.Errorf("unrecognized option: %d", opt) - } - } - - h.SetStreamHandler(ProtoID, r.handleNewStream) - - return r, nil -} - -// Increment the live hop count and increment the connection manager tags by 1 for the two -// sides of the hop stream. This ensures that connections with many hop streams will be protected -// from pruning, thus minimizing disruption from connection trimming in a relay node. -func (r *Relay) addLiveHop(from, to peer.ID) { - r.liveHopCount.Add(1) - r.host.ConnManager().UpsertTag(from, "relay-hop-stream", incrementTag) - r.host.ConnManager().UpsertTag(to, "relay-hop-stream", incrementTag) -} - -// Decrement the live hpo count and decrement the connection manager tags for the two sides -// of the hop stream. -func (r *Relay) rmLiveHop(from, to peer.ID) { - r.liveHopCount.Add(-1) - r.host.ConnManager().UpsertTag(from, "relay-hop-stream", decrementTag) - r.host.ConnManager().UpsertTag(to, "relay-hop-stream", decrementTag) - -} - -func (r *Relay) GetActiveHops() int32 { - return r.liveHopCount.Load() -} - -func (r *Relay) DialPeer(ctx context.Context, relay peer.AddrInfo, dest peer.AddrInfo) (*Conn, error) { - - log.Debugf("dialing peer %s through relay %s", dest.ID, relay.ID) - - if len(relay.Addrs) > 0 { - r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, peerstore.TempAddrTTL) - } - - s, err := r.host.NewStream(ctx, relay.ID, ProtoID) - if err != nil { - return nil, err - } - - rd := newDelimitedReader(s, maxMessageSize) - wr := newDelimitedWriter(s) - defer rd.Close() - - var msg pb.CircuitRelay - - msg.Type = pb.CircuitRelay_HOP.Enum() - msg.SrcPeer = peerInfoToPeer(r.host.Peerstore().PeerInfo(r.self)) - msg.DstPeer = peerInfoToPeer(dest) - - err = wr.WriteMsg(&msg) - if err != nil { - s.Reset() - return nil, err - } - - msg.Reset() - - err = rd.ReadMsg(&msg) - if err != nil { - s.Reset() - return nil, err - } - - if msg.GetType() != pb.CircuitRelay_STATUS { - s.Reset() - return nil, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType()) - } - - if msg.GetCode() != pb.CircuitRelay_SUCCESS { - s.Reset() - return nil, RelayError{msg.GetCode()} - } - - return &Conn{stream: s, remote: dest, host: r.host, relay: r}, nil -} - -func (r *Relay) Matches(addr ma.Multiaddr) bool { - // TODO: Look at the prefix transport as well. - _, err := addr.ValueForProtocol(ma.P_CIRCUIT) - return err == nil -} - -// Queries a peer for support of hop relay -func CanHop(ctx context.Context, host host.Host, id peer.ID) (bool, error) { - s, err := host.NewStream(ctx, id, ProtoID) - if err != nil { - return false, err - } - defer s.Close() - - rd := newDelimitedReader(s, maxMessageSize) - wr := newDelimitedWriter(s) - defer rd.Close() - - var msg pb.CircuitRelay - - msg.Type = pb.CircuitRelay_CAN_HOP.Enum() - - if err := wr.WriteMsg(&msg); err != nil { - s.Reset() - return false, err - } - - msg.Reset() - - if err := rd.ReadMsg(&msg); err != nil { - s.Reset() - return false, err - } - - if msg.GetType() != pb.CircuitRelay_STATUS { - return false, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType()) - } - - return msg.GetCode() == pb.CircuitRelay_SUCCESS, nil -} - -func (r *Relay) CanHop(ctx context.Context, id peer.ID) (bool, error) { - return CanHop(ctx, r.host, id) -} - -func (r *Relay) handleNewStream(s network.Stream) { - s.SetReadDeadline(time.Now().Add(streamTimeout)) - - log.Infof("new relay stream from: %s", s.Conn().RemotePeer()) - - rd := newDelimitedReader(s, maxMessageSize) - defer rd.Close() - - var msg pb.CircuitRelay - - err := rd.ReadMsg(&msg) - if err != nil { - r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE) - return - } - // reset stream deadline as message has been read - s.SetReadDeadline(time.Time{}) - - switch msg.GetType() { - case pb.CircuitRelay_HOP: - r.handleHopStream(s, &msg) - case pb.CircuitRelay_STOP: - r.handleStopStream(s, &msg) - case pb.CircuitRelay_CAN_HOP: - r.handleCanHop(s, &msg) - default: - log.Warnf("unexpected relay handshake: %d", msg.GetType()) - r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE) - } -} - -func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) { - if !r.hop { - r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - return - } - - streamCount := r.streamCount.Add(1) - liveHopCount := r.liveHopCount.Load() - defer r.streamCount.Add(-1) - - if (streamCount + liveHopCount) > int32(HopStreamLimit) { - log.Warn("hop stream limit exceeded; resetting stream") - s.Reset() - return - } - - src, err := peerToPeerInfo(msg.GetSrcPeer()) - if err != nil { - r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID) - return - } - - if src.ID != s.Conn().RemotePeer() { - r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID) - return - } - - dst, err := peerToPeerInfo(msg.GetDstPeer()) - if err != nil { - r.handleError(s, pb.CircuitRelay_HOP_DST_MULTIADDR_INVALID) - return - } - - if dst.ID == r.self { - r.handleError(s, pb.CircuitRelay_HOP_CANT_RELAY_TO_SELF) - return - } - - // open stream - ctx, cancel := context.WithTimeout(r.ctx, HopConnectTimeout) - defer cancel() - - if !r.active { - ctx = network.WithNoDial(ctx, "relay hop") - } else if len(dst.Addrs) > 0 { - r.host.Peerstore().AddAddrs(dst.ID, dst.Addrs, peerstore.TempAddrTTL) - } - - bs, err := r.host.NewStream(ctx, dst.ID, ProtoID) - if err != nil { - log.Debugf("error opening relay stream to %s: %s", dst.ID.Pretty(), err.Error()) - if err == network.ErrNoConn { - r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST) - } else { - r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST) - } - return - } - - // stop handshake - rd := newDelimitedReader(bs, maxMessageSize) - wr := newDelimitedWriter(bs) - defer rd.Close() - - // set handshake deadline - bs.SetDeadline(time.Now().Add(StopHandshakeTimeout)) - - msg.Type = pb.CircuitRelay_STOP.Enum() - - err = wr.WriteMsg(msg) - if err != nil { - log.Debugf("error writing stop handshake: %s", err.Error()) - bs.Reset() - r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) - return - } - - msg.Reset() - - err = rd.ReadMsg(msg) - if err != nil { - log.Debugf("error reading stop response: %s", err.Error()) - bs.Reset() - r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) - return - } - - if msg.GetType() != pb.CircuitRelay_STATUS { - log.Debugf("unexpected relay stop response: not a status message (%d)", msg.GetType()) - bs.Reset() - r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) - return - } - - if msg.GetCode() != pb.CircuitRelay_SUCCESS { - log.Debugf("relay stop failure: %d", msg.GetCode()) - bs.Reset() - r.handleError(s, msg.GetCode()) - return - } - - err = r.writeResponse(s, pb.CircuitRelay_SUCCESS) - if err != nil { - log.Debugf("error writing relay response: %s", err.Error()) - bs.Reset() - s.Reset() - return - } - - // relay connection - log.Infof("relaying connection between %s and %s", src.ID.Pretty(), dst.ID.Pretty()) - - // reset deadline - bs.SetDeadline(time.Time{}) - - r.addLiveHop(src.ID, dst.ID) - - var goroutines atomic.Int32 - goroutines.Store(2) - done := func() { - if goroutines.Add(-1) == 0 { - s.Close() - bs.Close() - r.rmLiveHop(src.ID, dst.ID) - } - } - - // Don't reset streams after finishing or the other side will get an - // error, not an EOF. - go func() { - defer done() - - buf := pool.Get(HopStreamBufferSize) - defer pool.Put(buf) - - count, err := io.CopyBuffer(s, bs, buf) - if err != nil { - log.Debugf("relay copy error: %s", err) - // Reset both. - s.Reset() - bs.Reset() - } else { - // propagate the close - s.CloseWrite() - } - log.Debugf("relayed %d bytes from %s to %s", count, dst.ID.Pretty(), src.ID.Pretty()) - }() - - go func() { - defer done() - - buf := pool.Get(HopStreamBufferSize) - defer pool.Put(buf) - - count, err := io.CopyBuffer(bs, s, buf) - if err != nil { - log.Debugf("relay copy error: %s", err) - // Reset both. - bs.Reset() - s.Reset() - } else { - // propagate the close - bs.CloseWrite() - } - log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty()) - }() -} - -func (r *Relay) handleStopStream(s network.Stream, msg *pb.CircuitRelay) { - src, err := peerToPeerInfo(msg.GetSrcPeer()) - if err != nil { - r.handleError(s, pb.CircuitRelay_STOP_SRC_MULTIADDR_INVALID) - return - } - - dst, err := peerToPeerInfo(msg.GetDstPeer()) - if err != nil || dst.ID != r.self { - r.handleError(s, pb.CircuitRelay_STOP_DST_MULTIADDR_INVALID) - return - } - - log.Infof("relay connection from: %s", src.ID) - - if len(src.Addrs) > 0 { - r.host.Peerstore().AddAddrs(src.ID, src.Addrs, peerstore.TempAddrTTL) - } - - select { - case r.incoming <- &Conn{stream: s, remote: src, host: r.host, relay: r}: - case <-time.After(RelayAcceptTimeout): - r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) - } -} - -func (r *Relay) handleCanHop(s network.Stream, msg *pb.CircuitRelay) { - var err error - - if r.hop { - err = r.writeResponse(s, pb.CircuitRelay_SUCCESS) - } else { - err = r.writeResponse(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) - } - - if err != nil { - s.Reset() - log.Debugf("error writing relay response: %s", err.Error()) - } else { - s.Close() - } -} - -func (r *Relay) handleError(s network.Stream, code pb.CircuitRelay_Status) { - log.Warnf("relay error: %s (%d)", pb.CircuitRelay_Status_name[int32(code)], code) - err := r.writeResponse(s, code) - if err != nil { - s.Reset() - log.Debugf("error writing relay response: %s", err.Error()) - } else { - s.Close() - } -} - -func (r *Relay) writeResponse(s network.Stream, code pb.CircuitRelay_Status) error { - wr := newDelimitedWriter(s) - - var msg pb.CircuitRelay - msg.Type = pb.CircuitRelay_STATUS.Enum() - msg.Code = code.Enum() - - return wr.WriteMsg(&msg) -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go deleted file mode 100644 index 30a80f4f95..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go +++ /dev/null @@ -1,467 +0,0 @@ -//lint:file-ignore U1000 Ignore all unused code, we're not running any tests. -package relay_test - -import ( - "bytes" - "context" - "fmt" - "io" - "net" - "testing" - "time" - - bhost "github.com/libp2p/go-libp2p/p2p/host/blank" - "github.com/libp2p/go-libp2p/p2p/net/swarm" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - . "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated" - pb "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated/pb" - - "github.com/libp2p/go-libp2p/core/host" - - ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" -) - -/* TODO: add tests -- simple A -[R]-> B -- A tries to relay through R, R doesnt support relay -- A tries to relay through R to B, B doesnt support relay -- A sends too long multiaddr -- R drops stream mid-message -- A relays through R, R has no connection to B -*/ - -func getNetHosts(t *testing.T, n int) []host.Host { - var out []host.Host - - for i := 0; i < n; i++ { - netw := swarmt.GenSwarm(t) - h := bhost.NewBlankHost(netw) - out = append(out, h) - } - - return out -} - -func newTestRelay(t *testing.T, host host.Host, opts ...RelayOpt) *Relay { - r, err := NewRelay(host, swarmt.GenUpgrader(t, host.Network().(*swarm.Swarm), nil), opts...) - if err != nil { - t.Fatal(err) - } - return r -} - -func connect(t *testing.T, a, b host.Host) { - pinfo := a.Peerstore().PeerInfo(a.ID()) - err := b.Connect(context.Background(), pinfo) - if err != nil { - t.Fatal(err) - } -} - -func TestBasicRelay(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts := getNetHosts(t, 3) - - connect(t, hosts[0], hosts[1]) - connect(t, hosts[1], hosts[2]) - - time.Sleep(10 * time.Millisecond) - - r1 := newTestRelay(t, hosts[0]) - - newTestRelay(t, hosts[1], OptHop) - - r3 := newTestRelay(t, hosts[2]) - - var ( - conn1, conn2 net.Conn - done = make(chan struct{}) - ) - - defer func() { - <-done - if conn1 != nil { - conn1.Close() - } - if conn2 != nil { - conn2.Close() - } - }() - - msg := []byte("relay works!") - go func() { - defer close(done) - list := r3.Listener() - - var err error - conn1, err = list.Accept() - if err != nil { - t.Error(err) - return - } - - _, err = conn1.Write(msg) - if err != nil { - t.Error(err) - return - } - }() - - rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) - dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - - rctx, rcancel := context.WithTimeout(ctx, time.Second) - defer rcancel() - - var err error - conn2, err = r1.DialPeer(rctx, rinfo, dinfo) - if err != nil { - t.Fatal(err) - } - - result := make([]byte, len(msg)) - _, err = io.ReadFull(conn2, result) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(result, msg) { - t.Fatal("message was incorrect:", string(result)) - } -} - -func TestRelayReset(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts := getNetHosts(t, 3) - - connect(t, hosts[0], hosts[1]) - connect(t, hosts[1], hosts[2]) - - time.Sleep(10 * time.Millisecond) - - r1 := newTestRelay(t, hosts[0]) - - newTestRelay(t, hosts[1], OptHop) - - r3 := newTestRelay(t, hosts[2]) - - ready := make(chan struct{}) - - msg := []byte("relay works!") - go func() { - list := r3.Listener() - - con, err := list.Accept() - if err != nil { - t.Error(err) - return - } - - <-ready - - _, err = con.Write(msg) - if err != nil { - t.Error(err) - return - } - - hosts[2].Network().ClosePeer(hosts[1].ID()) - }() - - rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) - dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - - rctx, rcancel := context.WithTimeout(ctx, time.Second) - defer rcancel() - - con, err := r1.DialPeer(rctx, rinfo, dinfo) - if err != nil { - t.Fatal(err) - } - - close(ready) - - _, err = io.ReadAll(con) - if err == nil { - t.Fatal("expected error for reset relayed connection") - } -} - -func TestBasicRelayDial(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - ctx, cancel := context.WithCancel(context.Background()) - - hosts := getNetHosts(t, 3) - - connect(t, hosts[0], hosts[1]) - connect(t, hosts[1], hosts[2]) - - time.Sleep(10 * time.Millisecond) - - r1 := newTestRelay(t, hosts[0]) - - _ = newTestRelay(t, hosts[1], OptHop) - r3 := newTestRelay(t, hosts[2]) - - var ( - conn1, conn2 net.Conn - done = make(chan struct{}) - ) - - defer func() { - cancel() - <-done - if conn1 != nil { - conn1.Close() - } - if conn2 != nil { - conn2.Close() - } - }() - - msg := []byte("relay works!") - go func() { - defer close(done) - list := r3.Listener() - - var err error - conn1, err = list.Accept() - if err != nil { - t.Error(err) - return - } - - _, err = conn1.Write(msg) - if err != nil { - t.Error(err) - return - } - }() - - addr := ma.StringCast(fmt.Sprintf("/ipfs/%s/p2p-circuit", hosts[1].ID().Pretty())) - - rctx, rcancel := context.WithTimeout(ctx, time.Second) - defer rcancel() - - var err error - conn2, err = r1.Dial(rctx, addr, hosts[2].ID()) - if err != nil { - t.Fatal(err) - } - - data := make([]byte, len(msg)) - _, err = io.ReadFull(conn2, data) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(data, msg) { - t.Fatal("message was incorrect:", string(data)) - } -} - -func TestUnspecificRelayDialFails(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - hosts := getNetHosts(t, 3) - - r1 := newTestRelay(t, hosts[0]) - newTestRelay(t, hosts[1], OptHop) - r3 := newTestRelay(t, hosts[2]) - - connect(t, hosts[0], hosts[1]) - connect(t, hosts[1], hosts[2]) - - time.Sleep(100 * time.Millisecond) - - go func() { - if _, err := r3.Listener().Accept(); err == nil { - t.Error("should not have received relay connection") - } - }() - - addr := ma.StringCast("/p2p-circuit") - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - if _, err := r1.Dial(ctx, addr, hosts[2].ID()); err == nil { - t.Fatal("expected dial with unspecified relay address to fail, even if we're connected to a relay") - } -} - -func TestRelayThroughNonHop(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts := getNetHosts(t, 3) - - connect(t, hosts[0], hosts[1]) - connect(t, hosts[1], hosts[2]) - - time.Sleep(10 * time.Millisecond) - - r1 := newTestRelay(t, hosts[0]) - - newTestRelay(t, hosts[1]) - - newTestRelay(t, hosts[2]) - - rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) - dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - - rctx, rcancel := context.WithTimeout(ctx, time.Second) - defer rcancel() - - _, err := r1.DialPeer(rctx, rinfo, dinfo) - if err == nil { - t.Fatal("expected error") - } - - rerr, ok := err.(RelayError) - if !ok { - t.Fatalf("expected RelayError: %#v", err) - } - - if rerr.Code != pb.CircuitRelay_HOP_CANT_SPEAK_RELAY { - t.Fatal("expected 'HOP_CANT_SPEAK_RELAY' error") - } -} - -func TestRelayNoDestConnection(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts := getNetHosts(t, 3) - - connect(t, hosts[0], hosts[1]) - - time.Sleep(10 * time.Millisecond) - - r1 := newTestRelay(t, hosts[0]) - - newTestRelay(t, hosts[1], OptHop) - - rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) - dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - - rctx, rcancel := context.WithTimeout(ctx, time.Second) - defer rcancel() - - _, err := r1.DialPeer(rctx, rinfo, dinfo) - if err == nil { - t.Fatal("expected error") - } - - rerr, ok := err.(RelayError) - if !ok { - t.Fatalf("expected RelayError: %#v", err) - } - - if rerr.Code != pb.CircuitRelay_HOP_NO_CONN_TO_DST { - t.Fatal("expected 'HOP_NO_CONN_TO_DST' error") - } -} - -func TestActiveRelay(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts := getNetHosts(t, 3) - - connect(t, hosts[0], hosts[1]) - - time.Sleep(10 * time.Millisecond) - - r1 := newTestRelay(t, hosts[0]) - newTestRelay(t, hosts[1], OptHop, OptActive) - r3 := newTestRelay(t, hosts[2]) - - connChan := make(chan manet.Conn) - - msg := []byte("relay works!") - go func() { - defer close(connChan) - list := r3.Listener() - - conn1, err := list.Accept() - if err != nil { - t.Error(err) - return - } - - if _, err := conn1.Write(msg); err != nil { - t.Error(err) - return - } - connChan <- conn1 - }() - - rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) - dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) - - rctx, rcancel := context.WithTimeout(ctx, time.Second) - defer rcancel() - - conn2, err := r1.DialPeer(rctx, rinfo, dinfo) - if err != nil { - t.Fatal(err) - } - defer conn2.Close() - - data := make([]byte, len(msg)) - _, err = io.ReadFull(conn2, data) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(data, msg) { - t.Fatal("message was incorrect:", string(data)) - } - conn1, ok := <-connChan - if !ok { - t.Fatal("listener didn't accept a connection") - } - conn1.Close() -} - -func TestRelayCanHop(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts := getNetHosts(t, 2) - - connect(t, hosts[0], hosts[1]) - - time.Sleep(10 * time.Millisecond) - - r1 := newTestRelay(t, hosts[0]) - - newTestRelay(t, hosts[1], OptHop) - - canhop, err := r1.CanHop(ctx, hosts[1].ID()) - if err != nil { - t.Fatal(err) - } - - if !canhop { - t.Fatal("Relay can't hop") - } -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport.go b/p2p/protocol/internal/circuitv1-deprecated/transport.go deleted file mode 100644 index d37b59b6de..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/transport.go +++ /dev/null @@ -1,74 +0,0 @@ -package relay - -import ( - "fmt" - "io" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/transport" - ma "github.com/multiformats/go-multiaddr" -) - -var circuitAddr = ma.Cast(ma.ProtocolWithCode(ma.P_CIRCUIT).VCode) - -var _ transport.Transport = (*RelayTransport)(nil) -var _ io.Closer = (*RelayTransport)(nil) - -type RelayTransport Relay - -func (t *RelayTransport) Relay() *Relay { - return (*Relay)(t) -} - -func (r *Relay) Transport() *RelayTransport { - return (*RelayTransport)(r) -} - -func (t *RelayTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) { - // TODO: Ensure we have a connection to the relay, if specified. Also, - // make sure the multiaddr makes sense. - if !t.Relay().Matches(laddr) { - return nil, fmt.Errorf("%s is not a relay address", laddr) - } - return t.upgrader.UpgradeListener(t, t.Relay().Listener()), nil -} - -func (t *RelayTransport) CanDial(raddr ma.Multiaddr) bool { - return t.Relay().Matches(raddr) -} - -func (t *RelayTransport) Proxy() bool { - return true -} - -func (t *RelayTransport) Protocols() []int { - return []int{ma.P_CIRCUIT} -} - -func (r *RelayTransport) Close() error { - r.ctxCancel() - return nil -} - -// AddRelayTransport constructs a relay and adds it as a transport to the host network. -func AddRelayTransport(h host.Host, upgrader transport.Upgrader, opts ...RelayOpt) error { - n, ok := h.Network().(transport.TransportNetwork) - if !ok { - return fmt.Errorf("%v is not a transport network", h.Network()) - } - - r, err := NewRelay(h, upgrader, opts...) - if err != nil { - return err - } - - // There's no nice way to handle these errors as we have no way to tear - // down the relay. - // TODO - if err := n.AddTransport(r.Transport()); err != nil { - log.Error("failed to add relay transport:", err) - } else if err := n.Listen(r.Listener().Multiaddr()); err != nil { - log.Error("failed to listen on relay transport:", err) - } - return nil -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport_test.go b/p2p/protocol/internal/circuitv1-deprecated/transport_test.go deleted file mode 100644 index c1cb1a9c4b..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/transport_test.go +++ /dev/null @@ -1,156 +0,0 @@ -//lint:file-ignore U1000 Ignore all unused code, we're not running any tests. -package relay_test - -import ( - "bytes" - "context" - "fmt" - "io" - "testing" - "time" - - "github.com/libp2p/go-libp2p/p2p/net/swarm" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - . "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peerstore" - - ma "github.com/multiformats/go-multiaddr" -) - -const TestProto = "test/relay-transport" - -var msg = []byte("relay works!") - -func testSetupRelay(t *testing.T) []host.Host { - hosts := getNetHosts(t, 3) - - err := AddRelayTransport(hosts[0], swarmt.GenUpgrader(t, hosts[0].Network().(*swarm.Swarm), nil)) - if err != nil { - t.Fatal(err) - } - - err = AddRelayTransport(hosts[1], swarmt.GenUpgrader(t, hosts[1].Network().(*swarm.Swarm), nil), OptHop) - if err != nil { - t.Fatal(err) - } - - err = AddRelayTransport(hosts[2], swarmt.GenUpgrader(t, hosts[2].Network().(*swarm.Swarm), nil)) - if err != nil { - t.Fatal(err) - } - - connect(t, hosts[0], hosts[1]) - connect(t, hosts[1], hosts[2]) - - time.Sleep(100 * time.Millisecond) - - handler := func(s network.Stream) { - _, err := s.Write(msg) - if err != nil { - t.Error(err) - } - s.Close() - } - - hosts[2].SetStreamHandler(TestProto, handler) - - return hosts -} - -func TestFullAddressTransportDial(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - hosts := testSetupRelay(t) - - var relayAddr ma.Multiaddr - for _, addr := range hosts[1].Addrs() { - // skip relay addrs. - if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err != nil { - relayAddr = addr - } - } - - addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s/p2p-circuit/p2p/%s", relayAddr.String(), hosts[1].ID().Pretty(), hosts[2].ID().Pretty())) - if err != nil { - t.Fatal(err) - } - - hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - s, err := hosts[0].NewStream(ctx, hosts[2].ID(), TestProto) - if err != nil { - t.Fatal(err) - } - - data, err := io.ReadAll(s) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(data, msg) { - t.Fatal("message was incorrect:", string(data)) - } -} - -func TestSpecificRelayTransportDial(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts := testSetupRelay(t) - - addr, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/ipfs/%s", hosts[1].ID().Pretty(), hosts[2].ID().Pretty())) - if err != nil { - t.Fatal(err) - } - - rctx, rcancel := context.WithTimeout(ctx, time.Second) - defer rcancel() - - hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL) - - s, err := hosts[0].NewStream(rctx, hosts[2].ID(), TestProto) - if err != nil { - t.Fatal(err) - } - - data, err := io.ReadAll(s) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(data, msg) { - t.Fatal("message was incorrect:", string(data)) - } -} - -func TestUnspecificRelayTransportDialFails(t *testing.T) { - t.Skip("This package is legacy code we only keep around for testing purposes.") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - hosts := testSetupRelay(t) - - addr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p-circuit/ipfs/%s", hosts[2].ID().Pretty())) - if err != nil { - t.Fatal(err) - } - - rctx, rcancel := context.WithTimeout(ctx, time.Second) - defer rcancel() - - hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL) - - _, err = hosts[0].NewStream(rctx, hosts[2].ID(), TestProto) - if err == nil { - t.Fatal("dial to unspecified address should have failed") - } - -} diff --git a/p2p/protocol/internal/circuitv1-deprecated/util.go b/p2p/protocol/internal/circuitv1-deprecated/util.go deleted file mode 100644 index 727c9a04eb..0000000000 --- a/p2p/protocol/internal/circuitv1-deprecated/util.go +++ /dev/null @@ -1,119 +0,0 @@ -package relay - -import ( - "errors" - "io" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated/pb" - - pool "github.com/libp2p/go-buffer-pool" - "github.com/libp2p/go-msgio/pbio" - - ma "github.com/multiformats/go-multiaddr" - "github.com/multiformats/go-varint" - "google.golang.org/protobuf/proto" -) - -//go:generate protoc --proto_path=$PWD:$PWD/../../../.. --go_out=. --go_opt=Mpb/relay.proto=./pb pb/relay.proto - -func peerToPeerInfo(p *pb.CircuitRelay_Peer) (peer.AddrInfo, error) { - if p == nil { - return peer.AddrInfo{}, errors.New("nil peer") - } - - id, err := peer.IDFromBytes(p.Id) - if err != nil { - return peer.AddrInfo{}, err - } - - addrs := make([]ma.Multiaddr, 0, len(p.Addrs)) - for _, addrBytes := range p.Addrs { - a, err := ma.NewMultiaddrBytes(addrBytes) - if err == nil { - addrs = append(addrs, a) - } - } - - return peer.AddrInfo{ID: id, Addrs: addrs}, nil -} - -func peerInfoToPeer(pi peer.AddrInfo) *pb.CircuitRelay_Peer { - addrs := make([][]byte, len(pi.Addrs)) - for i, addr := range pi.Addrs { - addrs[i] = addr.Bytes() - } - - p := new(pb.CircuitRelay_Peer) - p.Id = []byte(pi.ID) - p.Addrs = addrs - - return p -} - -func incrementTag(v int) int { - return v + 1 -} - -func decrementTag(v int) int { - if v > 0 { - return v - 1 - } else { - return v - } -} - -type delimitedReader struct { - r io.Reader - buf []byte -} - -// The protobuf NewDelimitedReader is buffered, which may eat up stream data. -// So we need to implement a compatible delimited reader that reads unbuffered. -// There is a slowdown from unbuffered reading: when reading the message -// it can take multiple single byte Reads to read the length and another Read -// to read the message payload. -// However, this is not critical performance degradation as -// - the reader is utilized to read one (dialer, stop) or two messages (hop) during -// the handshake, so it's a drop in the water for the connection lifetime. -// - messages are small (max 4k) and the length fits in a couple of bytes, -// so overall we have at most three reads per message. -func newDelimitedReader(r io.Reader, maxSize int) *delimitedReader { - return &delimitedReader{r: r, buf: pool.Get(maxSize)} -} - -func (d *delimitedReader) Close() { - if d.buf != nil { - pool.Put(d.buf) - d.buf = nil - } -} - -func (d *delimitedReader) ReadByte() (byte, error) { - buf := d.buf[:1] - _, err := d.r.Read(buf) - return buf[0], err -} - -func (d *delimitedReader) ReadMsg(msg proto.Message) error { - mlen, err := varint.ReadUvarint(d) - if err != nil { - return err - } - - if uint64(len(d.buf)) < mlen { - return errors.New("message too large") - } - - buf := d.buf[:mlen] - _, err = io.ReadFull(d.r, buf) - if err != nil { - return err - } - - return proto.Unmarshal(buf, msg) -} - -func newDelimitedWriter(w io.Writer) pbio.WriteCloser { - return pbio.NewDelimitedWriter(w) -}