From 52c1f0565e9e183bdc6934c39ec4a97d3fd3be9e Mon Sep 17 00:00:00 2001 From: zhiyi Date: Fri, 17 Nov 2023 17:27:19 +0800 Subject: [PATCH] webrtc and quic improvements --- Makefile | 4 +- client/api/conn.go | 22 ++- client/client.go | 304 ++++++++++++++++--------------- client/config.go | 25 ++- client/tcpforward.go | 2 +- client/webrtc/peerconnection.cpp | 2 + client/webrtc/peerconnection.go | 8 +- conn/quicConn.go | 85 ++++----- dep/patch/openssl.patch | 12 ++ server/conn.go | 61 ++++--- 10 files changed, 280 insertions(+), 245 deletions(-) create mode 100644 dep/patch/openssl.patch diff --git a/Makefile b/Makefile index 9c8b8167..aedac6aa 100644 --- a/Makefile +++ b/Makefile @@ -115,6 +115,7 @@ update_submodule: git config --global --add safe.directory /go/src/github.com/isrc-cas/gt/dep/_msquic/submodules/openssl3/tlslite-ng git config --global --add safe.directory /go/src/github.com/isrc-cas/gt/dep/_msquic/submodules/openssl3/wycheproof $(UPDATE_SUBMODULE_COMMAND) + @git apply dep/patch/* && echo "patches applied" || echo "no patch applied" docker_create_image: update_submodule docker images | grep -cim1 -E "^gtbuild\s+?v1" || docker build -t gtbuild:v1 . @@ -260,9 +261,6 @@ check_msquic_dependencies: compile_msquic: check_msquic_dependencies update_submodule mkdir -p ./dep/_msquic/$(TARGET) - sed 's|\(^ *msquic_lib\)$$|\1 ALL|g' ./dep/_msquic/src/bin/CMakeLists.txt > ./dep/_msquic/src/bin/CMakeLists.txt.tmp - cat ./dep/_msquic/src/bin/CMakeLists.txt.tmp > ./dep/_msquic/src/bin/CMakeLists.txt - rm ./dep/_msquic/src/bin/CMakeLists.txt.tmp cmake -B./dep/_msquic/$(TARGET) -S./dep/_msquic -DQUIC_BUILD_SHARED=OFF -DCMAKE_TARGET_ARCHITECTURE=$(TARGET_CPU) make -C./dep/_msquic/$(TARGET) -j$(shell nproc) @renameSymbols=$$(objdump -t ./dep/_msquic/$(TARGET)/bin/Release/libmsquic.a | awk -v RS= '/_YB80VJ/{next}1' | grep -E 'g +(F|O) ' | grep -Evi ' (ms){0,1}quic' | awk '{print " --redefine-sym " $$NF "=" $$NF "_YB80VJ"}') && \ diff --git a/client/api/conn.go b/client/api/conn.go index bf1a1b14..11460025 100644 --- a/client/api/conn.go +++ b/client/api/conn.go @@ -74,15 +74,25 @@ func (c *Conn) Read(b []byte) (n int, err error) { func (c *Conn) Write(b []byte) (n int, err error) { c.mtx.Lock() defer c.mtx.Unlock() + binary.BigEndian.PutUint32(c.buf[0:], c.id) binary.BigEndian.PutUint16(c.buf[4:], predef.Data) - binary.BigEndian.PutUint32(c.buf[6:], uint32(len(b))) - nw, err := c.writer.Write(c.buf[:10]) - if err != nil { - return + + var i int + for { + j := copy(c.buf[10:], b[i:]) + i += j + binary.BigEndian.PutUint32(c.buf[6:], uint32(j)) + var nw int + nw, err = c.writer.Write(c.buf[:10+j]) + if err != nil { + return + } + n += nw + if len(b[i:]) == 0 { + break + } } - n, err = c.writer.Write(b) - n += nw return } diff --git a/client/client.go b/client/client.go index 201ea4c5..09ffbc85 100644 --- a/client/client.go +++ b/client/client.go @@ -192,96 +192,102 @@ func sig(sig syscall.Signal) (err error) { } type dialer struct { - host string - stun string - tlsConfig *tls.Config - dialFn func() (conn net.Conn, err error) + tcp string + tls string + quic string + bbr bool + preferQuic bool + stuns []string + tlsConfig *tls.Config } -func (d *dialer) init(c *Client, remote string, stun string) (err error) { - var u *url.URL - u, err = url.Parse(remote) - if err != nil { - err = fmt.Errorf("remote url (-remote option) '%s' is invalid, cause %s", remote, err.Error()) - return +func (d *dialer) isReady() (ready bool) { + if len(d.tls) > 0 { + return true } - d.stun = stun - c.Logger.Info().Str("remote", remote).Str("stun", stun).Msg("remote url") - switch u.Scheme { - case "tls": - if len(u.Port()) < 1 { - u.Host = net.JoinHostPort(u.Host, "443") - } - tlsConfig := &tls.Config{} - if len(c.Config().RemoteCert) > 0 { - var cf []byte - cf, err = os.ReadFile(c.Config().RemoteCert) - if err != nil { - err = fmt.Errorf("failed to read remote cert file (-remoteCert option) '%s', cause %s", c.Config().RemoteCert, err.Error()) - return + if len(d.quic) > 0 { + return true + } + if len(d.tcp) > 0 { + return true + } + + return false +} + +func (d *dialer) init(c *Client, remotes []string, stuns []string) (err error) { + d.stuns = stuns + for _, remote := range remotes { + var u *url.URL + u, err = url.Parse(remote) + if err != nil { + err = fmt.Errorf("remote url (-remote option) '%s' is invalid, cause %s", remote, err.Error()) + return + } + c.Logger.Info().Str("remote", remote).Strs("stuns", stuns).Msg("remote url") + switch u.Scheme { + case "tls": + if len(u.Port()) < 1 { + u.Host = net.JoinHostPort(u.Host, "443") } - roots := x509.NewCertPool() - ok := roots.AppendCertsFromPEM(cf) - if !ok { - err = fmt.Errorf("failed to parse remote cert file (-remoteCert option) '%s'", c.Config().RemoteCert) - return + tlsConfig := &tls.Config{} + if len(c.Config().RemoteCert) > 0 { + var cf []byte + cf, err = os.ReadFile(c.Config().RemoteCert) + if err != nil { + err = fmt.Errorf("failed to read remote cert file (-remoteCert option) '%s', cause %s", c.Config().RemoteCert, err.Error()) + return + } + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM(cf) + if !ok { + err = fmt.Errorf("failed to parse remote cert file (-remoteCert option) '%s'", c.Config().RemoteCert) + return + } + tlsConfig.RootCAs = roots } - tlsConfig.RootCAs = roots - } - if c.Config().RemoteCertInsecure { - tlsConfig.InsecureSkipVerify = true - } - d.host = u.Host - d.tlsConfig = tlsConfig - d.dialFn = d.tlsDial - case "tcp": - if len(u.Port()) < 1 { - u.Host = net.JoinHostPort(u.Host, "80") - } - d.host = u.Host - d.dialFn = d.dial - case "quic": - if len(u.Port()) < 1 { - u.Host = net.JoinHostPort(u.Host, "443") - } - tlsConfig := &tls.Config{} - if len(c.Config().RemoteCert) > 0 { - var cf []byte - cf, err = os.ReadFile(c.Config().RemoteCert) - if err != nil { - err = fmt.Errorf("failed to read remote cert file (-remoteCert option) '%s', cause %s", c.Config().RemoteCert, err.Error()) - return + if c.Config().RemoteCertInsecure { + tlsConfig.InsecureSkipVerify = true } - roots := x509.NewCertPool() - ok := roots.AppendCertsFromPEM(cf) - if !ok { - err = fmt.Errorf("failed to parse remote cert file (-remoteCert option) '%s'", c.Config().RemoteCert) - return + d.tls = u.Host + d.tlsConfig = tlsConfig + case "tcp": + if len(u.Port()) < 1 { + u.Host = net.JoinHostPort(u.Host, "80") } - tlsConfig.RootCAs = roots - } - if c.Config().RemoteCertInsecure { - tlsConfig.InsecureSkipVerify = true - } - d.host = u.Host - d.tlsConfig = tlsConfig - //quic-go只有Cubic一种拥塞控制算法 - //msquic默认使用bbr作为拥塞控制算法 - if c.Config().OpenBBR { - d.dialFn = d.msquicDial - } else { - d.dialFn = d.quicDial + d.tcp = u.Host + case "quic": + if len(u.Port()) < 1 { + u.Host = net.JoinHostPort(u.Host, "443") + } + tlsConfig := &tls.Config{} + if len(c.Config().RemoteCert) > 0 { + var cf []byte + cf, err = os.ReadFile(c.Config().RemoteCert) + if err != nil { + err = fmt.Errorf("failed to read remote cert file (-remoteCert option) '%s', cause %s", c.Config().RemoteCert, err.Error()) + return + } + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM(cf) + if !ok { + err = fmt.Errorf("failed to parse remote cert file (-remoteCert option) '%s'", c.Config().RemoteCert) + return + } + tlsConfig.RootCAs = roots + } + if c.Config().RemoteCertInsecure { + tlsConfig.InsecureSkipVerify = true + } + d.quic = u.Host + d.tlsConfig = tlsConfig + default: + err = fmt.Errorf("remote url (-remote option) '%s' is invalid", remote) } - default: - err = fmt.Errorf("remote url (-remote option) '%s' is invalid", remote) } return } -func (d *dialer) initWithRemote(c *Client) (err error) { - return d.init(c, c.Config().Remote[c.chosenRemoteLabel], c.Config().RemoteSTUN) -} - func (d *dialer) initWithRemoteAPI(c *Client) (err error) { req, err := http.NewRequest("GET", c.Config().RemoteAPI, nil) if err != nil { @@ -317,24 +323,74 @@ func (d *dialer) initWithRemoteAPI(c *Client) (err error) { return } } - err = d.init(c, addr, stunAddr) + err = d.init(c, []string{addr}, []string{stunAddr}) return } -func (d *dialer) dial() (conn net.Conn, err error) { - return net.Dial("tcp", d.host) +func (d *dialer) tcpDial() (conn net.Conn, err error) { + return net.Dial("tcp", d.tcp) } func (d *dialer) tlsDial() (conn net.Conn, err error) { - return tls.Dial("tcp", d.host, d.tlsConfig) + return tls.Dial("tcp", d.tls, d.tlsConfig) } func (d *dialer) quicDial() (conn net.Conn, err error) { - return connection.QuicDial(d.host, d.tlsConfig) + return connection.QuicDial(d.quic, d.tlsConfig) } func (d *dialer) msquicDial() (conn net.Conn, err error) { - return msquic.MsquicDial(d.host, d.tlsConfig) + return msquic.MsquicDial(d.quic, d.tlsConfig) +} + +func (d *dialer) dial() (conn net.Conn, err error) { + if !d.preferQuic && len(d.tls) > 0 { + return d.tlsDial() + } + if len(d.quic) > 0 { + if d.bbr { + return d.msquicDial() + } + return d.quicDial() + } + if len(d.tcp) > 0 { + return d.tcpDial() + } + + return nil, errors.New("no dialer available") +} + +func (c *Client) processRemotes() (result []dialer, err error) { + m := make(map[string][]string) + for index, remote := range c.Config().Remote { + if !strings.Contains(remote, "://") { + remote = "tcp://" + remote + c.Config().Remote[index] = remote + } + var u *url.URL + u, err = url.Parse(remote) + if err != nil { + return + } + hostname := u.Hostname() + value, ok := m[hostname] + if ok { + value = append(value, remote) + m[hostname] = value + } else { + m[hostname] = []string{remote} + } + } + result = make([]dialer, 0, len(m)) + for _, remotes := range m { + var d dialer + err = d.init(c, remotes, c.Config().RemoteSTUN) + if err != nil { + return + } + result = append(result, d) + } + return } // Start runs the client agent. @@ -383,61 +439,9 @@ func (c *Client) Start() (err error) { return } - var dialer dialer + var ds []dialer if len(c.Config().Remote) > 0 { - var hasQuic bool - var enterSwitch bool - for index := range c.Config().Remote { - if !strings.Contains(c.Config().Remote[index], "://") { - c.Config().Remote[index] = "tcp://" + c.Config().Remote[index] - } - } - if len(c.Config().Remote) >= 2 { - enterSwitch = true - for index, remote := range c.Config().Remote { - var u *url.URL - u, err = url.Parse(remote) - if err != nil { - err = fmt.Errorf("remote url (-remote option) '%s' is invalid, cause %s", remote, err.Error()) - return - } - if u.Scheme == "quic" { - c.Logger.Info().Str("remote", remote).Msg("waiting...intelligent switch are sending probes to get network conditions...") - hasQuic = true - if len(u.Port()) < 1 { - u.Host = net.JoinHostPort(u.Host, "443") - } - var avgRtt, pktLoss float64 - avgRtt, pktLoss, err = connection.GetQuicProbesResults(u.Host) - if err != nil { - c.Logger.Error().Err(err).Msg("can not use QUIC connection to detect network conditions") - return err - } - - c.Logger.Info().Float64("averageRTT", avgRtt).Float64("lossRate", pktLoss).Msg("QUIC probes get network conditions with") - var networkCondition = []float64{0, 0, 0, 0, avgRtt, pktLoss, 0, 0, 0, 0} - result := connection.PredictWithRttAndLoss(networkCondition) - if result[1] > result[0] { - c.chosenRemoteLabel = index - } else { - if index == 0 { - c.chosenRemoteLabel = 1 - } else { - c.chosenRemoteLabel = 0 - } - } - } - } - if !hasQuic { - c.chosenRemoteLabel = 0 - } - } else { - c.chosenRemoteLabel = 0 - } - if enterSwitch { - c.Logger.Info().Str("remote", c.Config().Remote[c.chosenRemoteLabel]).Msg("intelligent switch strategy finally choose to establish with") - } - err = dialer.initWithRemote(c) + ds, err = c.processRemotes() if err != nil { return } @@ -448,20 +452,22 @@ func (c *Client) Start() (err error) { err = fmt.Errorf("remote api url (-remoteAPI option) '%s' must begin with http:// or https://", c.Config().RemoteAPI) return } - for len(dialer.host) == 0 { + var dialer dialer + for len(ds) == 0 { if atomic.LoadUint32(&c.closing) == 1 { err = errors.New("client is closing") return } err = dialer.initWithRemoteAPI(c) if err == nil { + ds = append(ds, dialer) break } c.Logger.Error().Err(err).Msg("failed to query server address") time.Sleep(c.Config().ReconnectDelay.Duration) } } - if len(dialer.host) == 0 { + if len(ds) == 0 { err = errors.New("option -remote or -remoteAPI must be specified") return } @@ -488,9 +494,13 @@ func (c *Client) Start() (err error) { conf4Log.Password = "******" conf4Log.SigningKey = "******" c.Logger.Info().Msg(spew.Sdump(conf4Log)) - for i := uint(1); i <= c.Config().RemoteConnections; i++ { - go c.connectLoop(dialer, i) - c.waitTunnelsShutdown.Add(1) + connID := uint(0) + for _, dialer := range ds { + for i := uint(1); i <= c.Config().RemoteConnections; i++ { + connID += 1 + go c.connectLoop(dialer, connID) + c.waitTunnelsShutdown.Add(1) + } } c.apiServer.Start() @@ -505,7 +515,9 @@ func (c *Client) Start() (err error) { return } c.Logger.Info().Str("addr", c.tcpForwardListener.Addr().String()).Msg("Listening TCP forward") - go c.tcpForwardStart(dialer) + for _, dialer := range ds { + go c.tcpForwardStart(dialer) + } } return @@ -599,12 +611,12 @@ func (c *Client) initConn(d dialer, connID uint) (result *conn, err error) { c.initConnMtx.Lock() defer c.initConnMtx.Unlock() - conn, err := d.dialFn() + conn, err := d.dial() if err != nil { return } result = newConn(conn, c) - result.stuns = append(result.stuns, d.stun) + result.stuns = append(result.stuns, d.stuns...) result.Logger = c.Logger.With().Uint("connID", connID).Logger() err = result.init() if err != nil { diff --git a/client/config.go b/client/config.go index bb613db1..610e8c10 100644 --- a/client/config.go +++ b/client/config.go @@ -37,18 +37,18 @@ type Config struct { // Options is the config options for a client. type Options struct { - Config string `arg:"config" yaml:"-" json:"-" usage:"The config file path to load"` - ID string `yaml:"id,omitempty" json:",omitempty" usage:"The unique id used to connect to server. Now it's the prefix of the domain."` - Secret string `yaml:"secret,omitempty" json:",omitempty" usage:"The secret used to verify the id"` - ReconnectDelay config.Duration `yaml:"reconnectDelay,omitempty" json:",omitempty" usage:"The delay before reconnect. Supports values like '30s', '5m'"` - Remote config.Slice[string] `yaml:"remote,omitempty" json:",omitempty" usage:"The remote server url. Supports tcp:// and tls:// and quic://, default tcp://"` - RemoteSTUN string `yaml:"remoteSTUN,omitempty" json:",omitempty" usage:"The remote STUN server address"` - RemoteAPI string `yaml:"remoteAPI,omitempty" json:",omitempty" usage:"The API to get remote server url"` - RemoteCert string `yaml:"remoteCert,omitempty" json:",omitempty" usage:"The path to remote cert"` - RemoteCertInsecure bool `yaml:"remoteCertInsecure,omitempty" json:",omitempty" usage:"Accept self-signed SSL certs from remote"` - RemoteConnections uint `yaml:"remoteConnections,omitempty" json:",omitempty" usage:"The max number of server connections in the pool. Valid value is 1 to 10"` - RemoteIdleConnections uint `yaml:"remoteIdleConnections,omitempty" json:",omitempty" usage:"The number of idle server connections kept in the pool"` - RemoteTimeout config.Duration `yaml:"remoteTimeout,omitempty" json:",omitempty" usage:"The timeout of remote connections. Supports values like '30s', '5m'"` + Config string `arg:"config" yaml:"-" json:"-" usage:"The config file path to load"` + ID string `yaml:"id,omitempty" json:",omitempty" usage:"The unique id used to connect to server. Now it's the prefix of the domain."` + Secret string `yaml:"secret,omitempty" json:",omitempty" usage:"The secret used to verify the id"` + ReconnectDelay config.Duration `yaml:"reconnectDelay,omitempty" json:",omitempty" usage:"The delay before reconnect. Supports values like '30s', '5m'"` + Remote config.Slice[string] `yaml:"remote,omitempty" json:",omitempty" usage:"The remote server url. Supports tcp:// and tls:// and quic://, default tcp://"` + RemoteSTUN config.Slice[string] `yaml:"remoteSTUN,omitempty" json:",omitempty" usage:"The remote STUN server address"` + RemoteAPI string `yaml:"remoteAPI,omitempty" json:",omitempty" usage:"The API to get remote server url"` + RemoteCert string `yaml:"remoteCert,omitempty" json:",omitempty" usage:"The path to remote cert"` + RemoteCertInsecure bool `yaml:"remoteCertInsecure,omitempty" json:",omitempty" usage:"Accept self-signed SSL certs from remote"` + RemoteConnections uint `yaml:"remoteConnections,omitempty" json:",omitempty" usage:"The max number of server connections in the pool. Valid value is 1 to 10"` + RemoteIdleConnections uint `yaml:"remoteIdleConnections,omitempty" json:",omitempty" usage:"The number of idle server connections kept in the pool"` + RemoteTimeout config.Duration `yaml:"remoteTimeout,omitempty" json:",omitempty" usage:"The timeout of remote connections. Supports values like '30s', '5m'"` HostPrefix config.PositionSlice[string] `yaml:"-" json:"-" arg:"hostPrefix" usage:"The server will recognize this host prefix and forward data to local"` RemoteTCPPort config.PositionSlice[uint16] `yaml:"-" json:"-" arg:"remoteTCPPort" usage:"The TCP port that the remote server will open"` @@ -92,7 +92,6 @@ type Options struct { Signal string `arg:"s" yaml:"-" json:"-" usage:"Send signal to client processes. Supports values: reload, restart, stop, kill"` OpenBBR bool `yaml:"bbr" usage:"Use bbr as congestion control algorithm (through msquic) when GT use QUIC connection. Default algorithm is Cubic (through quic-go)."` - } // if you enable web service, it will set 'Config' if not specified diff --git a/client/tcpforward.go b/client/tcpforward.go index 4e4fb4fe..e6b9af07 100644 --- a/client/tcpforward.go +++ b/client/tcpforward.go @@ -230,7 +230,7 @@ func (c *Client) createPeerConnection(dialer dialer) (peerConnection *webrtc.Pee dataChannelUnused.Close() // 发送 offer - conn, err := dialer.dialFn() + conn, err := dialer.dial() if err != nil { return } diff --git a/client/webrtc/peerconnection.cpp b/client/webrtc/peerconnection.cpp index 944892bd..f92022f2 100644 --- a/client/webrtc/peerconnection.cpp +++ b/client/webrtc/peerconnection.cpp @@ -349,6 +349,8 @@ char *NewPeerConnection(void **peerConnectionOutside, char **iceServers, int ice } void DeletePeerConnection(void *peerConnectionOutside) { + if (peerConnectionOutside == nullptr) + return; auto peerConnectionObserver = (::PeerConnectionObserver *)peerConnectionOutside; peerConnectionObserver->Delete(); } diff --git a/client/webrtc/peerconnection.go b/client/webrtc/peerconnection.go index 505b7cca..9a3a56c2 100644 --- a/client/webrtc/peerconnection.go +++ b/client/webrtc/peerconnection.go @@ -538,6 +538,10 @@ func (p *PeerConnection) CreateDataChannel(label string, negotiated bool, config } func (p *PeerConnection) Close() { - C.DeletePeerConnection(p.peerConnection) - pointer.Unref(p.pointerID) + if p.peerConnection != nil { + C.DeletePeerConnection(p.peerConnection) + } + if p.pointerID != nil { + pointer.Unref(p.pointerID) + } } diff --git a/conn/quicConn.go b/conn/quicConn.go index 73afd3c2..5f6da522 100644 --- a/conn/quicConn.go +++ b/conn/quicConn.go @@ -7,18 +7,16 @@ import ( "crypto/rand" "crypto/tls" "crypto/x509" + "encoding/binary" "encoding/pem" "math/big" "net" - "sync/atomic" "time" "github.com/isrc-cas/gt/predef" "github.com/quic-go/quic-go" ) -const probePacketLostTimeOutMs = 5 - type QuicConnection struct { quic.Connection quic.Stream @@ -97,66 +95,57 @@ func GenerateTLSConfig() *tls.Config { } } -func GetQuicProbesResults(addr string) (avgRtt float64, pktLoss float64, err error) { - totalNum := 100 - var totalSuccessNum int64 = 0 - var totalDelay int64 = 0 - var buf []byte - probeCloseError := &quic.ApplicationError{ - Remote: false, - ErrorCode: 0x42, - ErrorMessage: "close QUIC probe connection", - } - tlsConfig := &tls.Config{} - tlsConfig.InsecureSkipVerify = true +const ProbeTimes = 10 +func GetQuicProbesResults(addr string, timeout time.Duration) (avgRtt float64, pktLoss float64, err error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + } conn, err := QuicDial(addr, tlsConfig) if err != nil { return } - sendBuffer := []byte{predef.MagicNumber, 0x02} + defer func() { + _ = conn.Close() + }() + sendBuffer := []byte{predef.MagicNumber, 0x02, 0x00} _, err = conn.Write(sendBuffer) if err != nil { return } - for i := 0; i < totalNum; i++ { - go func() { - err = conn.(*QuicConnection).SendDatagram([]byte(time.Now().Format("2006-01-02 15:04:05.000000000"))) - if err != nil { - return - } - }() - } + var totalSuccessNum int64 + var totalDelay int64 + var buf []byte + for i := 0; i < ProbeTimes; i++ { + bs := [9]byte{} + bs[0] = byte(i) + now := time.Now().UnixMilli() + binary.BigEndian.PutUint64(bs[1:], uint64(now)) + err = conn.(*QuicConnection).SendDatagram(bs[:]) + if err != nil { + return + } - for { - timer := time.AfterFunc(probePacketLostTimeOutMs*time.Second, func() { - err = conn.(*QuicConnection).CloseWithError(0x42, "close QUIC probe connection") - if err != nil { - return - } - }) - buf, err = conn.(*QuicConnection).ReceiveDatagram(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + buf, err = conn.(*QuicConnection).ReceiveDatagram(ctx) + cancel() if err != nil { - // QUIC的stream关闭时会返回io.EOF,但是QUIC的不可靠数据包Datagram是在connection层面进行发送的 - // 因此需要通过quic.ApplicationError判断QUIC connection是否由于应用程序主动关闭 - if err.Error() == probeCloseError.Error() { - err = nil + return + } + if len(buf) >= 9 { + no := buf[0] + u := binary.BigEndian.Uint64(buf[1:]) + interval := time.Now().Sub(time.UnixMilli(int64(u))).Milliseconds() + totalSuccessNum += 1 + totalDelay += interval + if no == ProbeTimes-1 { break - } else { - return } } - if buf != nil { - sendTine, _ := time.ParseInLocation("2006-01-02 15:04:05.000000000", string(buf), time.Local) - interval := time.Now().Sub(sendTine).Microseconds() - atomic.AddInt64(&totalSuccessNum, 1) - atomic.AddInt64(&totalDelay, interval) - } - timer.Stop() } - - avgRtt = float64(atomic.LoadInt64(&totalDelay)) / (float64(1000 * totalNum)) - pktLoss = 1 - float64(atomic.LoadInt64(&totalSuccessNum))/float64(totalNum) + avgRtt = float64(totalDelay) / (float64(ProbeTimes)) + pktLoss = 1 - float64(totalSuccessNum)/float64(ProbeTimes) + _, err = conn.Write([]byte{0xFF}) return } diff --git a/dep/patch/openssl.patch b/dep/patch/openssl.patch new file mode 100644 index 00000000..607e4e9a --- /dev/null +++ b/dep/patch/openssl.patch @@ -0,0 +1,12 @@ +diff --git a/dep/_msquic/submodules/CMakeLists.txt b/dep/_msquic/submodules/CMakeLists.txt +--- a/dep/_msquic/submodules/CMakeLists.txt ++++ b/dep/_msquic/submodules/CMakeLists.txt +@@ -241,6 +241,8 @@ + set(OPENSSL_CONFIG_CMD ${CMAKE_CURRENT_SOURCE_DIR}/${QUIC_OPENSSL}/Configure linux-armv4) + endif() + list(APPEND OPENSSL_CONFIG_FLAGS -latomic) ++ elseif (CMAKE_TARGET_ARCHITECTURE STREQUAL x64) ++ set(OPENSSL_CONFIG_CMD ${CMAKE_CURRENT_SOURCE_DIR}/${QUIC_OPENSSL}/Configure linux-x86_64) + else() + set(OPENSSL_CONFIG_CMD ${CMAKE_CURRENT_SOURCE_DIR}/${QUIC_OPENSSL}/config + CC=${CMAKE_C_COMPILER} CXX=${CMAKE_CXX_COMPILER}) diff --git a/server/conn.go b/server/conn.go index a7ee3d26..027b1fcd 100644 --- a/server/conn.go +++ b/server/conn.go @@ -35,7 +35,6 @@ import ( connection "github.com/isrc-cas/gt/conn" "github.com/isrc-cas/gt/pool" "github.com/isrc-cas/gt/predef" - "github.com/quic-go/quic-go" ) var ( @@ -165,38 +164,48 @@ func (c *conn) handle(handleFunc func() bool) { handled = c.handleTunnelLoop(remoteIP) return case 0x02: - var buf []byte - probeCloseError := &quic.ApplicationError{ - Remote: true, - ErrorCode: 0x42, - ErrorMessage: "close QUIC probe connection", - } - for { - timer := time.AfterFunc(3*time.Second, func() { - c.Logger.Info().Msg("closing QUIC probe connection") - }) - if buf, err = c.Connection.Conn.(*connection.QuicConnection).ReceiveDatagram(context.Background()); err != nil { - if err.Error() == probeCloseError.Error() { - break - } else { - c.Logger.Warn().Err(err).Msg("failed to use QUIC probe connection to receive message") - return - } - } - err = c.Connection.Conn.(*connection.QuicConnection).SendDatagram(buf) - if err != nil { - c.Logger.Warn().Err(err).Msg("failed to use QUIC probe connection to send message") - return - } - timer.Stop() + _, err = reader.Discard(2) + if err != nil { + c.Logger.Warn().Err(err).Msg("failed to discard version field") + return } - handled = true + handled = c.handleProbe(reader) return } } handled = handleFunc() } +func (c *conn) handleProbe(r *bufio.Reader) (handled bool) { + op, err := r.ReadByte() + if err != nil { + c.Logger.Warn().Err(err).Msg("handleProbe read op error") + return + } + switch op { + case 0x00: + for i := 0; i < connection.ProbeTimes; i++ { + ctx, cancel := context.WithTimeout(context.Background(), c.server.config.Timeout.Duration) + buf, err := c.Connection.Conn.(*connection.QuicConnection).ReceiveDatagram(ctx) + cancel() + if err != nil { + c.Logger.Warn().Err(err).Msg("handleProbe ReceiveDatagram error") + return + } + err = c.Connection.Conn.(*connection.QuicConnection).SendDatagram(buf) + if err != nil { + c.Logger.Warn().Err(err).Msg("handleProbe SendDatagram error") + return + } + no := buf[0] + if no == connection.ProbeTimes-1 { + break + } + } + } + return true +} + func (c *conn) handleSNI() (handled bool) { var err error var host []byte