From b777797b4ad2416c2f4d34e2ba3fb32302be1e77 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 15 Jul 2022 01:32:34 +0200 Subject: [PATCH] Correctly handle discovery cleanup and re-add --- arduino/discovery/discovery.go | 5 +- .../discoverymanager/discoverymanager.go | 55 ++++++++++--------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/arduino/discovery/discovery.go b/arduino/discovery/discovery.go index ba55e00fdff..53fcc734e27 100644 --- a/arduino/discovery/discovery.go +++ b/arduino/discovery/discovery.go @@ -368,10 +368,13 @@ func (disc *PluggableDiscovery) Stop() error { func (disc *PluggableDiscovery) stopSync() { if disc.eventChan != nil { + for _, port := range disc.cachedPorts { + disc.eventChan <- &Event{"remove", port, disc.GetID()} + } + disc.cachedPorts = map[string]*Port{} disc.eventChan <- &Event{"stop", nil, disc.GetID()} close(disc.eventChan) disc.eventChan = nil - disc.cachedPorts = map[string]*Port{} } } diff --git a/arduino/discovery/discoverymanager/discoverymanager.go b/arduino/discovery/discoverymanager/discoverymanager.go index e6d23f1fca3..e8252a5a259 100644 --- a/arduino/discovery/discoverymanager/discoverymanager.go +++ b/arduino/discovery/discoverymanager/discoverymanager.go @@ -18,6 +18,7 @@ package discoverymanager import ( "fmt" "sync" + "time" "github.com/arduino/arduino-cli/arduino/discovery" "github.com/arduino/arduino-cli/i18n" @@ -83,7 +84,12 @@ func (dm *DiscoveryManager) Start() { return } - go dm.feeder() + go func() { + // Feed all watchers with data coming from the discoveries + for ev := range dm.feed { + dm.feedEvent(ev) + } + }() var wg sync.WaitGroup for _, d := range dm.discoveries { @@ -136,13 +142,13 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) { dm.Start() watcher := &PortWatcher{ - feed: make(chan *discovery.Event), + feed: make(chan *discovery.Event, 10), } watcher.closeCB = func() { dm.watchersMutex.Lock() delete(dm.watchers, watcher) - dm.watchersMutex.Unlock() close(watcher.feed) + dm.watchersMutex.Unlock() } go func() { dm.watchersMutex.Lock() @@ -182,44 +188,43 @@ func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (dis return nil } -func (dm *DiscoveryManager) feeder() { - // Feed all watchers with data coming from the discoveries - for ev := range dm.feed { - dm.watchersMutex.Lock() - for watcher := range dm.watchers { - select { - case watcher.feed <- ev: - // OK - default: - // If the watcher is not able to process event fast enough - // remove the watcher from the list of watchers - go watcher.Close() - } +func (dm *DiscoveryManager) feedEvent(ev *discovery.Event) { + dm.watchersMutex.Lock() + defer dm.watchersMutex.Unlock() + + if ev.Type == "stop" { + // Remove all the cached events for the terminating discovery + delete(dm.watchersCache, ev.DiscoveryID) + return + } + + // Send the event to all watchers + for watcher := range dm.watchers { + select { + case watcher.feed <- ev: + // OK + case <-time.After(time.Millisecond * 500): + // If the watcher is not able to process event fast enough + // remove the watcher from the list of watchers + logrus.Info("Watcher is not able to process events fast enough, removing it from the list of watchers") + delete(dm.watchers, watcher) } - dm.cacheEvent(ev) - dm.watchersMutex.Unlock() } -} -func (dm *DiscoveryManager) cacheEvent(ev *discovery.Event) { + // Cache the event for the discovery cache := dm.watchersCache[ev.DiscoveryID] if cache == nil { cache = map[string]*discovery.Event{} dm.watchersCache[ev.DiscoveryID] = cache } - eventID := ev.Port.Address + "|" + ev.Port.Protocol switch ev.Type { case "add": cache[eventID] = ev case "remove": delete(cache, eventID) - case "quit": - // Remove all the events for this discovery - delete(dm.watchersCache, ev.DiscoveryID) default: logrus.Errorf("Unhandled event from discovery: %s", ev.Type) - return } }