Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tunnel: write packet size and packet data in one stream.Write call #118

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
curl -sSL https://github.com/librespeed/speedtest-cli/releases/download/v1.0.10/librespeed-cli_1.0.10_linux_amd64.tar.gz | tar -xzf -
echo $CONFIG_AWL_LINUX > config_awl.json
elif [ "$RUNNER_OS" == "macOS" ]; then
curl -sSL https://github.com/librespeed/speedtest-cli/releases/download/v1.0.10/librespeed-cli_1.0.10_darwin_amd64.tar.gz | tar -xzf -
curl -sSL https://github.com/librespeed/speedtest-cli/releases/download/v1.0.10/librespeed-cli_1.0.10_darwin_arm64.tar.gz | tar -xzf -
echo $CONFIG_AWL_MACOS > config_awl.json
elif [ "$RUNNER_OS" == "Windows" ]; then
curl -sSL https://github.com/librespeed/speedtest-cli/releases/download/v1.0.10/librespeed-cli_1.0.10_windows_amd64.zip > download.zip
Expand Down
15 changes: 8 additions & 7 deletions application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ import (
"testing"
"time"

"github.com/anywherelan/awl/api"
"github.com/anywherelan/awl/api/apiclient"
"github.com/anywherelan/awl/config"
"github.com/anywherelan/awl/entity"
"github.com/anywherelan/awl/p2p"
"github.com/anywherelan/awl/vpn"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-log/v2"
Expand All @@ -34,6 +28,13 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"golang.zx2c4.com/wireguard/tun"

"github.com/anywherelan/awl/api"
"github.com/anywherelan/awl/api/apiclient"
"github.com/anywherelan/awl/config"
"github.com/anywherelan/awl/entity"
"github.com/anywherelan/awl/p2p"
"github.com/anywherelan/awl/vpn"
)

func init() {
Expand Down Expand Up @@ -310,7 +311,7 @@ func BenchmarkTunnelPackets(b *testing.B) {
peer1.tun.Outbound <- packet
atomic.AddInt64(&packetsSent, 1)
// to have packet_loss at reasonable level (but more than 0)
const sleepEvery = 80
const sleepEvery = 100
if i != 0 && i%sleepEvery == 0 {
time.Sleep(1 * time.Millisecond)
}
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type (
ListenAddresses []string `json:"listenAddresses"`
ReconnectionIntervalSec time.Duration `json:"reconnectionIntervalSec" swaggertype:"primitive,integer"`
AutoAcceptAuthRequests bool `json:"autoAcceptAuthRequests"`

UseDedicatedConnForEachStream bool `json:"useDedicatedConnForEachStream"`
ParallelSendingStreamsCount int `json:"parallelSendingStreamsCount"`
}
VPNConfig struct {
InterfaceName string `json:"interfaceName"`
Expand Down
5 changes: 3 additions & 2 deletions config/other.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func setDefaults(conf *Config, bus awlevent.Bus) {
if conf.P2pNode.ReconnectionIntervalSec == 0 {
conf.P2pNode.ReconnectionIntervalSec = 10
}
if conf.P2pNode.ParallelSendingStreamsCount == 0 {
conf.P2pNode.ParallelSendingStreamsCount = 1
}

// Other
if conf.LoggerLevel == "" {
Expand All @@ -155,8 +158,6 @@ func setDefaults(conf *Config, bus awlevent.Bus) {
if isEmptyConfig {
conf.HttpListenOnAdminHost = true
}
// TODO: remove in next release
conf.HttpListenOnAdminHost = true

if conf.VPNConfig.IPNet == "" {
conf.VPNConfig.IPNet = defaultNetworkSubnet
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/milosgajdos/tenus v0.0.3
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.0
github.com/multiformats/go-multistream v0.5.0
github.com/olekukonko/tablewriter v0.0.5
github.com/quic-go/quic-go v0.39.4
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -106,7 +107,6 @@ require (
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
github.com/opencontainers/runtime-spec v1.1.0 // indirect
Expand Down
61 changes: 61 additions & 0 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"errors"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
Expand All @@ -28,6 +29,7 @@ import (
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/multiformats/go-multiaddr"
msmux "github.com/multiformats/go-multistream"
"go.uber.org/multierr"
)

Expand Down Expand Up @@ -209,6 +211,34 @@ func (p *P2p) NewStream(ctx context.Context, id peer.ID, proto protocol.ID) (net
return p.host.NewStream(ctx, id, proto)
}

func (p *P2p) NewStreamWithDedicatedConn(ctx context.Context, id peer.ID, proto protocol.ID) (network.Stream, error) {
ctx = network.WithUseTransient(ctx, "awl")

// mostly copied from NewStream()
// github.com/libp2p/[email protected]/p2p/host/basic/basic_host.go:634
conn, err := p.host.Network().DialPeer(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to dial: %v", err)
}

stream, err := conn.NewStream(ctx)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("failed to create new stream: %v", err)
}

err = stream.SetProtocol(proto)
if err != nil {
return nil, fmt.Errorf("failed to set protocol to stream: %v", err)
}
lzcon := msmux.NewMSSelect(stream, proto)

return &streamWrapper{
Stream: stream,
rw: lzcon,
}, nil
}

func (p *P2p) IsConnected(peerID peer.ID) bool {
return p.host.Network().Connectedness(peerID) == network.Connected
}
Expand Down Expand Up @@ -376,3 +406,34 @@ func DefaultListenAddrs() []multiaddr.Multiaddr {
multiaddr.StringCast(fmt.Sprintf("/ip6/::/udp/%d/quic-v1", defaultP2pPort)),
}
}

// copied from
// github.com/libp2p/[email protected]/p2p/host/basic/basic_host.go:1050
type streamWrapper struct {
network.Stream
rw io.ReadWriteCloser
}

func (s *streamWrapper) Read(b []byte) (int, error) {
return s.rw.Read(b)
}

func (s *streamWrapper) Write(b []byte) (int, error) {
return s.rw.Write(b)
}

func (s *streamWrapper) Close() error {
return s.rw.Close()
}

func (s *streamWrapper) CloseWrite() error {
// Flush the handshake before closing, but ignore the error. The other
// end may have closed their side for reading.
//
// If something is wrong with the stream, the user will get on error on
// read instead.
if flusher, ok := s.rw.(interface{ Flush() error }); ok {
_ = flusher.Flush()
}
return s.Stream.CloseWrite()
}
13 changes: 7 additions & 6 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func SendAuthResponse(stream io.Writer, response AuthPeerResponse) error {

func ReadUint64(stream io.Reader) (uint64, error) {
var data [8]byte
n, err := stream.Read(data[:])
n, err := io.ReadFull(stream, data[:])
if err != nil {
return 0, err
}
Expand All @@ -82,9 +82,10 @@ func ReadUint64(stream io.Reader) (uint64, error) {
return value, nil
}

func WriteUint64(stream io.Writer, number uint64) error {
var data [8]byte
binary.BigEndian.PutUint64(data[:], number)
_, err := stream.Write(data[:])
return err
func WritePacketToBuf(buf, packet []byte) []byte {
const lenBytesCount = 8
binary.BigEndian.PutUint64(buf, uint64(len(packet)))
n := copy(buf[lenBytesCount:], packet)

return buf[:lenBytesCount+n]
}
10 changes: 6 additions & 4 deletions service/auth_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"sync"
"time"

"github.com/anywherelan/awl/awldns"
"github.com/anywherelan/awl/awlevent"
"github.com/anywherelan/awl/config"
"github.com/anywherelan/awl/protocol"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"

"github.com/anywherelan/awl/awldns"
"github.com/anywherelan/awl/awlevent"
"github.com/anywherelan/awl/config"
"github.com/anywherelan/awl/protocol"
)

const (
Expand All @@ -25,6 +26,7 @@ const (
type P2p interface {
ConnectPeer(ctx context.Context, peerID peer.ID) error
NewStream(ctx context.Context, id peer.ID, proto libp2pProtocol.ID) (network.Stream, error)
NewStreamWithDedicatedConn(ctx context.Context, id peer.ID, proto libp2pProtocol.ID) (network.Stream, error)
SubscribeConnectionEvents(onConnected, onDisconnected func(network.Network, network.Conn))
ProtectPeer(id peer.ID)
}
Expand Down
34 changes: 23 additions & 11 deletions service/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"sync"
"time"

"github.com/anywherelan/awl/config"
"github.com/anywherelan/awl/protocol"
"github.com/anywherelan/awl/vpn"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/anywherelan/awl/config"
"github.com/anywherelan/awl/protocol"
"github.com/anywherelan/awl/vpn"
)

const (
Expand Down Expand Up @@ -175,10 +176,16 @@ func (t *Tunnel) makeTunnelStream(ctx context.Context, peerID peer.ID) (network.
return nil, err
}

stream, err := t.p2p.NewStream(ctx, peerID, protocol.TunnelPacketMethod)
newStreamFunc := t.p2p.NewStream
if t.conf.P2pNode.UseDedicatedConnForEachStream {
newStreamFunc = t.p2p.NewStreamWithDedicatedConn
}

stream, err := newStreamFunc(ctx, peerID, protocol.TunnelPacketMethod)
if err != nil {
return nil, err
}

return stream, nil
}

Expand All @@ -192,7 +199,10 @@ type VpnPeer struct {
// TODO: remove Tunnel from VpnPeer dependencies
func (vp *VpnPeer) Start(t *Tunnel) {
go vp.backgroundInboundHandler(t)
go vp.backgroundOutboundHandler(t)

for i := 0; i < t.conf.P2pNode.ParallelSendingStreamsCount; i++ {
go vp.backgroundOutboundHandler(t)
}
}

func (vp *VpnPeer) Close(t *Tunnel) {
Expand Down Expand Up @@ -224,12 +234,13 @@ func (vp *VpnPeer) backgroundOutboundHandler(t *Tunnel) {
return fmt.Errorf("make tunnel stream: %v", err)
}
}
// TODO: write packet len and packet data in one stream.Write - probably it's much more efficient
err = protocol.WriteUint64(stream, uint64(len(packet.Packet)))
if err != nil {
return err
}
_, err = stream.Write(packet.Packet)

tmpPacket := t.device.GetTempPacket()
defer t.device.PutTempPacket(tmpPacket)

protocolPacket := protocol.WritePacketToBuf(tmpPacket.Buffer[:], packet.Packet)
_, err = stream.Write(protocolPacket)

return err
}

Expand Down Expand Up @@ -280,6 +291,7 @@ func (vp *VpnPeer) backgroundInboundHandler(t *Tunnel) {
t.device.PutTempPacket(packet)
continue
}
// TODO: add batching
err := t.device.WritePacket(packet, vp.localIP)
if err != nil {
t.logger.Warnf("write packet to vpn: %v", err)
Expand Down
Loading