Skip to content

Commit

Permalink
Discoveries are now closed and unregistered after failure (#1667)
Browse files Browse the repository at this point in the history
* Discoveries are now closed and unregistered after failure

* Add mutex to guard discoveries in DiscoveryManager
  • Loading branch information
silvanocerza authored Feb 18, 2022
1 parent dd48868 commit 26e1dc2
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 26 deletions.
17 changes: 4 additions & 13 deletions arduino/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,21 +374,12 @@ func (disc *PluggableDiscovery) Stop() error {
}

// Quit terminates the discovery. No more commands can be accepted by the discovery.
func (disc *PluggableDiscovery) Quit() error {
if err := disc.sendCommand("QUIT\n"); err != nil {
return err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return fmt.Errorf(tr("calling %[1]s: %[2]w"), "QUIT", err)
} else if msg.EventType != "quit" {
return errors.Errorf(tr("communication out of sync, expected '%[1]s', received '%[2]s'"), "quit", msg.EventType)
} else if msg.Error {
return errors.Errorf(tr("command failed: %s"), msg.Message)
} else if strings.ToUpper(msg.Message) != "OK" {
return errors.Errorf(tr("communication out of sync, expected '%[1]s', received '%[2]s'"), "OK", msg.Message)
func (disc *PluggableDiscovery) Quit() {
_ = disc.sendCommand("QUIT\n")
if _, err := disc.waitMessage(time.Second * 5); err != nil {
logrus.Errorf("Quitting discovery %s: %s", disc.id, err)
}
disc.killProcess()
return nil
}

// List executes an enumeration of the ports and returns a list of the available
Expand Down
4 changes: 1 addition & 3 deletions arduino/discovery/discovery_client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ out:
}

for _, disc := range discoveries {
if err := disc.Quit(); err != nil {
log.Fatal("Error stopping discovery:", err)
}
disc.Quit()
fmt.Println("Discovery QUITed")
for disc.State() == discovery.Alive {
time.Sleep(time.Millisecond)
Expand Down
47 changes: 43 additions & 4 deletions arduino/discovery/discoverymanager/discoverymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"github.com/arduino/arduino-cli/arduino/discovery"
"github.com/arduino/arduino-cli/i18n"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// DiscoveryManager is required to handle multiple pluggable-discovery that
// may be shared across platforms
type DiscoveryManager struct {
discoveries map[string]*discovery.PluggableDiscovery
discoveriesMutex sync.Mutex
discoveries map[string]*discovery.PluggableDiscovery
}

var tr = i18n.Tr
Expand All @@ -42,12 +44,16 @@ func New() *DiscoveryManager {
// Clear resets the DiscoveryManager to its initial state
func (dm *DiscoveryManager) Clear() {
dm.QuitAll()
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
dm.discoveries = map[string]*discovery.PluggableDiscovery{}
}

// IDs returns the list of discoveries' ids in this DiscoveryManager
func (dm *DiscoveryManager) IDs() []string {
ids := []string{}
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
for id := range dm.discoveries {
ids = append(ids, id)
}
Expand All @@ -57,19 +63,38 @@ func (dm *DiscoveryManager) IDs() []string {
// Add adds a discovery to the list of managed discoveries
func (dm *DiscoveryManager) Add(disc *discovery.PluggableDiscovery) error {
id := disc.GetID()
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
if _, has := dm.discoveries[id]; has {
return errors.Errorf(tr("pluggable discovery already added: %s"), id)
}
dm.discoveries[id] = disc
return nil
}

// remove quits and deletes the discovery with specified id
// from the discoveries managed by this DiscoveryManager
func (dm *DiscoveryManager) remove(id string) {
dm.discoveriesMutex.Lock()
d := dm.discoveries[id]
delete(dm.discoveries, id)
dm.discoveriesMutex.Unlock()
d.Quit()
logrus.Infof("Closed and removed discovery %s", id)
}

// parallelize runs function f concurrently for each discovery.
// Returns a list of errors returned by each call of f.
func (dm *DiscoveryManager) parallelize(f func(d *discovery.PluggableDiscovery) error) []error {
var wg sync.WaitGroup
errChan := make(chan error)
dm.discoveriesMutex.Lock()
discoveries := []*discovery.PluggableDiscovery{}
for _, d := range dm.discoveries {
discoveries = append(discoveries, d)
}
dm.discoveriesMutex.Unlock()
for _, d := range discoveries {
wg.Add(1)
go func(d *discovery.PluggableDiscovery) {
defer wg.Done()
Expand Down Expand Up @@ -103,6 +128,7 @@ func (dm *DiscoveryManager) RunAll() []error {
}

if err := d.Run(); err != nil {
dm.remove(d.GetID())
return fmt.Errorf(tr("discovery %[1]s process not started: %[2]w"), d.GetID(), err)
}
return nil
Expand All @@ -119,6 +145,7 @@ func (dm *DiscoveryManager) StartAll() []error {
return nil
}
if err := d.Start(); err != nil {
dm.remove(d.GetID())
return fmt.Errorf(tr("starting discovery %[1]s: %[2]w"), d.GetID(), err)
}
return nil
Expand All @@ -139,6 +166,7 @@ func (dm *DiscoveryManager) StartSyncAll() (<-chan *discovery.Event, []error) {

eventCh, err := d.StartSync(5)
if err != nil {
dm.remove(d.GetID())
return fmt.Errorf(tr("start syncing discovery %[1]s: %[2]w"), d.GetID(), err)
}

Expand Down Expand Up @@ -170,6 +198,7 @@ func (dm *DiscoveryManager) StopAll() []error {
}

if err := d.Stop(); err != nil {
dm.remove(d.GetID())
return fmt.Errorf(tr("stopping discovery %[1]s: %[2]w"), d.GetID(), err)
}
return nil
Expand All @@ -185,9 +214,7 @@ func (dm *DiscoveryManager) QuitAll() []error {
return nil
}

if err := d.Quit(); err != nil {
return fmt.Errorf(tr("quitting discovery %[1]s: %[2]w"), d.GetID(), err)
}
d.Quit()
return nil
})
return errs
Expand All @@ -204,7 +231,13 @@ func (dm *DiscoveryManager) List() ([]*discovery.Port, []error) {
Port *discovery.Port
}
msgChan := make(chan listMsg)
dm.discoveriesMutex.Lock()
discoveries := []*discovery.PluggableDiscovery{}
for _, d := range dm.discoveries {
discoveries = append(discoveries, d)
}
dm.discoveriesMutex.Unlock()
for _, d := range discoveries {
wg.Add(1)
go func(d *discovery.PluggableDiscovery) {
defer wg.Done()
Expand Down Expand Up @@ -243,7 +276,13 @@ func (dm *DiscoveryManager) List() ([]*discovery.Port, []error) {
// ListCachedPorts return the current list of ports detected from all discoveries
func (dm *DiscoveryManager) ListCachedPorts() []*discovery.Port {
res := []*discovery.Port{}
dm.discoveriesMutex.Lock()
discoveries := []*discovery.PluggableDiscovery{}
for _, d := range dm.discoveries {
discoveries = append(discoveries, d)
}
dm.discoveriesMutex.Unlock()
for _, d := range discoveries {
if d.State() != discovery.Syncing {
// Discovery is not syncing
continue
Expand Down
11 changes: 5 additions & 6 deletions commands/board/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,14 @@ func Watch(instanceID int32, interrupt <-chan bool) (<-chan *rpc.BoardListWatchR
Error: boardsError,
}
case <-interrupt:
errs := dm.StopAll()
if len(errs) > 0 {
for _, err := range dm.StopAll() {
// Discoveries that return errors have their process
// closed and are removed from the list of discoveries
// in the manager
outChan <- &rpc.BoardListWatchResponse{
EventType: "error",
Error: tr("stopping discoveries: %s", errs),
Error: tr("stopping discoveries: %s", err),
}
// Don't close the channel if quitting all discoveries
// failed, otherwise some processes might be left running.
continue
}
return
}
Expand Down

0 comments on commit 26e1dc2

Please sign in to comment.