Skip to content

Commit

Permalink
Correctly handle discovery cleanup and re-add
Browse files Browse the repository at this point in the history
  • Loading branch information
cmaglie committed Jul 14, 2022
1 parent e2ebc8c commit b777797
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 26 deletions.
5 changes: 4 additions & 1 deletion arduino/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,13 @@ func (disc *PluggableDiscovery) Stop() error {

func (disc *PluggableDiscovery) stopSync() {
if disc.eventChan != nil {
for _, port := range disc.cachedPorts {
disc.eventChan <- &Event{"remove", port, disc.GetID()}
}
disc.cachedPorts = map[string]*Port{}
disc.eventChan <- &Event{"stop", nil, disc.GetID()}
close(disc.eventChan)
disc.eventChan = nil
disc.cachedPorts = map[string]*Port{}
}
}

Expand Down
55 changes: 30 additions & 25 deletions arduino/discovery/discoverymanager/discoverymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package discoverymanager
import (
"fmt"
"sync"
"time"

"github.com/arduino/arduino-cli/arduino/discovery"
"github.com/arduino/arduino-cli/i18n"
Expand Down Expand Up @@ -83,7 +84,12 @@ func (dm *DiscoveryManager) Start() {
return
}

go dm.feeder()
go func() {
// Feed all watchers with data coming from the discoveries
for ev := range dm.feed {
dm.feedEvent(ev)
}
}()

var wg sync.WaitGroup
for _, d := range dm.discoveries {
Expand Down Expand Up @@ -136,13 +142,13 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
dm.Start()

watcher := &PortWatcher{
feed: make(chan *discovery.Event),
feed: make(chan *discovery.Event, 10),
}
watcher.closeCB = func() {
dm.watchersMutex.Lock()
delete(dm.watchers, watcher)
dm.watchersMutex.Unlock()
close(watcher.feed)
dm.watchersMutex.Unlock()
}
go func() {
dm.watchersMutex.Lock()
Expand Down Expand Up @@ -182,44 +188,43 @@ func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (dis
return nil
}

func (dm *DiscoveryManager) feeder() {
// Feed all watchers with data coming from the discoveries
for ev := range dm.feed {
dm.watchersMutex.Lock()
for watcher := range dm.watchers {
select {
case watcher.feed <- ev:
// OK
default:
// If the watcher is not able to process event fast enough
// remove the watcher from the list of watchers
go watcher.Close()
}
func (dm *DiscoveryManager) feedEvent(ev *discovery.Event) {
dm.watchersMutex.Lock()
defer dm.watchersMutex.Unlock()

if ev.Type == "stop" {
// Remove all the cached events for the terminating discovery
delete(dm.watchersCache, ev.DiscoveryID)
return
}

// Send the event to all watchers
for watcher := range dm.watchers {
select {
case watcher.feed <- ev:
// OK
case <-time.After(time.Millisecond * 500):
// If the watcher is not able to process event fast enough
// remove the watcher from the list of watchers
logrus.Info("Watcher is not able to process events fast enough, removing it from the list of watchers")
delete(dm.watchers, watcher)
}
dm.cacheEvent(ev)
dm.watchersMutex.Unlock()
}
}

func (dm *DiscoveryManager) cacheEvent(ev *discovery.Event) {
// Cache the event for the discovery
cache := dm.watchersCache[ev.DiscoveryID]
if cache == nil {
cache = map[string]*discovery.Event{}
dm.watchersCache[ev.DiscoveryID] = cache
}

eventID := ev.Port.Address + "|" + ev.Port.Protocol
switch ev.Type {
case "add":
cache[eventID] = ev
case "remove":
delete(cache, eventID)
case "quit":
// Remove all the events for this discovery
delete(dm.watchersCache, ev.DiscoveryID)
default:
logrus.Errorf("Unhandled event from discovery: %s", ev.Type)
return
}
}

Expand Down

0 comments on commit b777797

Please sign in to comment.