Skip to content

Commit

Permalink
check if we have a successful connection so we do not need to connect…
Browse files Browse the repository at this point in the history
… to other servers
  • Loading branch information
mohamed-essam committed Nov 27, 2024
1 parent 18216ba commit adf31ce
Showing 1 changed file with 32 additions and 22 deletions.
54 changes: 32 additions & 22 deletions relay/client/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,25 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
concurrentLimiter := make(chan struct{}, maxConcurrentServers)

log.Debugf("pick server from list: %v", sp.ServerURLs.Load().([]string))
go sp.processConnResults(connResultChan, successChan)
for _, url := range sp.ServerURLs.Load().([]string) {
// todo check if we have a successful connection so we do not need to connect to other servers
concurrentLimiter <- struct{}{}
go func(url string) {
defer func() {
<-concurrentLimiter
}()
sp.startConnection(parentCtx, connResultChan, url)
}(url)
select {
case concurrentLimiter <- struct{}{}:
go func(url string) {
defer func() {
<-concurrentLimiter
}()
sp.startConnection(parentCtx, connResultChan, url)
}(url)
case cr, ok := <-successChan:
if !ok {
return nil, errors.New("failed to connect to any relay server: all attempts failed")
}
log.Infof("chosen home Relay server: %s with latency %s", cr.Url, cr.Latency)
return cr.RelayClient, nil
}
}

go sp.processConnResults(connResultChan, successChan)

select {
case cr, ok := <-successChan:
if !ok {
Expand Down Expand Up @@ -106,18 +112,8 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh
}
log.Infof("connected to Relay server: %s with latency %s", cr.Url, cr.Latency)

// Already connected to a lower latency server
if hasSuccess && cr.Latency > bestLatencyResult.Latency {
log.Infof("closing unnecessary Relay connection to: %s", cr.Url)
if err := cr.RelayClient.Close(); err != nil {
log.Errorf("failed to close connection to %s: %v", cr.Url, err)
}
continue
} else if hasSuccess { // Connected to a higher latency server in bestLatencyResult, disconnect from it
log.Infof("closing unnecessary Relay connection to: %s", bestLatencyResult.Url)
if err := bestLatencyResult.RelayClient.Close(); err != nil {
log.Errorf("failed to close connection to %s: %v", bestLatencyResult.Url, err)
}
if hasSuccess {
cr = lowestLatency(cr, bestLatencyResult)
}

// First successful connection, start a timer to return the result
Expand All @@ -138,3 +134,17 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh
}
close(successChan)
}

func lowestLatency(a, b connResult) connResult {
if a.Latency > b.Latency {
if err := b.RelayClient.Close(); err != nil {
log.Errorf("failed to close connection to %s: %v", b.Url, err)
}
return a
}

if err := a.RelayClient.Close(); err != nil {
log.Errorf("failed to close connection to %s: %v", a.Url, err)
}
return b
}

0 comments on commit adf31ce

Please sign in to comment.