diff --git a/arduino/cores/packagemanager/package_manager_test.go b/arduino/cores/packagemanager/package_manager_test.go index ec12a6070b9..9cb1509dca0 100644 --- a/arduino/cores/packagemanager/package_manager_test.go +++ b/arduino/cores/packagemanager/package_manager_test.go @@ -329,16 +329,14 @@ func TestPackageManagerClear(t *testing.T) { packageManager := packagemanager.NewPackageManager(customHardware, customHardware, customHardware, customHardware, "test") packageManager.LoadHardwareFromDirectory(customHardware) - // Creates another PackageManager but don't load the hardware - emptyPackageManager := packagemanager.NewPackageManager(customHardware, customHardware, customHardware, customHardware, "test") + // Check that the hardware is loaded + require.NotEmpty(t, packageManager.Packages) - // Verifies they're not equal - require.NotEqual(t, packageManager, emptyPackageManager) - - // Clear the first PackageManager that contains loaded hardware + // Clear the package manager packageManager.Clear() - // Verifies both PackageManagers are now equal - require.Equal(t, packageManager, emptyPackageManager) + + // Check that the hardware is cleared + require.Empty(t, packageManager.Packages) } func TestFindToolsRequiredFromPlatformRelease(t *testing.T) { diff --git a/arduino/discovery/discovery.go b/arduino/discovery/discovery.go index 40067cc3e43..9def18596e7 100644 --- a/arduino/discovery/discovery.go +++ b/arduino/discovery/discovery.go @@ -57,7 +57,6 @@ type PluggableDiscovery struct { incomingMessagesError error state int eventChan chan<- *Event - cachedPorts map[string]*Port } type discoveryMessage struct { @@ -121,8 +120,9 @@ func (p *Port) String() string { // Event is a pluggable discovery event type Event struct { - Type string - Port *Port + Type string + Port *Port + DiscoveryID string } // New create and connect to the given pluggable discovery @@ -131,7 +131,6 @@ func New(id string, args ...string) *PluggableDiscovery { id: id, processArgs: args, state: Dead, - cachedPorts: map[string]*Port{}, } } @@ -176,9 +175,8 @@ func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *dis return } disc.statusMutex.Lock() - disc.cachedPorts[msg.Port.Address+"|"+msg.Port.Protocol] = msg.Port if disc.eventChan != nil { - disc.eventChan <- &Event{"add", msg.Port} + disc.eventChan <- &Event{"add", msg.Port, disc.GetID()} } disc.statusMutex.Unlock() } else if msg.EventType == "remove" { @@ -187,9 +185,8 @@ func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *dis return } disc.statusMutex.Lock() - delete(disc.cachedPorts, msg.Port.Address+"|"+msg.Port.Protocol) if disc.eventChan != nil { - disc.eventChan <- &Event{"remove", msg.Port} + disc.eventChan <- &Event{"remove", msg.Port, disc.GetID()} } disc.statusMutex.Unlock() } else { @@ -276,10 +273,7 @@ func (disc *PluggableDiscovery) killProcess() error { } disc.statusMutex.Lock() defer disc.statusMutex.Unlock() - if disc.eventChan != nil { - close(disc.eventChan) - disc.eventChan = nil - } + disc.stopSync() disc.state = Dead logrus.Infof("killed discovery %s process", disc.id) return nil @@ -366,13 +360,17 @@ func (disc *PluggableDiscovery) Stop() error { } disc.statusMutex.Lock() defer disc.statusMutex.Unlock() - disc.cachedPorts = map[string]*Port{} + disc.stopSync() + disc.state = Idling + return nil +} + +func (disc *PluggableDiscovery) stopSync() { if disc.eventChan != nil { + disc.eventChan <- &Event{"stop", nil, disc.GetID()} close(disc.eventChan) disc.eventChan = nil } - disc.state = Idling - return nil } // Quit terminates the discovery. No more commands can be accepted by the discovery. @@ -409,6 +407,9 @@ func (disc *PluggableDiscovery) List() ([]*Port, error) { // The event channel must be consumed as quickly as possible since it may block the // discovery if it becomes full. The channel size is configurable. func (disc *PluggableDiscovery) StartSync(size int) (<-chan *Event, error) { + disc.statusMutex.Lock() + defer disc.statusMutex.Unlock() + if err := disc.sendCommand("START_SYNC\n"); err != nil { return nil, err } @@ -423,29 +424,10 @@ func (disc *PluggableDiscovery) StartSync(size int) (<-chan *Event, error) { return nil, errors.Errorf(tr("communication out of sync, expected '%[1]s', received '%[2]s'"), "OK", msg.Message) } - disc.statusMutex.Lock() - defer disc.statusMutex.Unlock() disc.state = Syncing - disc.cachedPorts = map[string]*Port{} - if disc.eventChan != nil { - // In case there is already an existing event channel in use we close it - // before creating a new one. - close(disc.eventChan) - } + // In case there is already an existing event channel in use we close it before creating a new one. + disc.stopSync() c := make(chan *Event, size) disc.eventChan = c return c, nil } - -// ListCachedPorts returns a list of the available ports. The list is a cache of all the -// add/remove events happened from the StartSync call and it will not consume any -// resource from the underliying discovery. -func (disc *PluggableDiscovery) ListCachedPorts() []*Port { - disc.statusMutex.Lock() - defer disc.statusMutex.Unlock() - res := []*Port{} - for _, port := range disc.cachedPorts { - res = append(res, port) - } - return res -} diff --git a/arduino/discovery/discovery_client/go.mod b/arduino/discovery/discovery_client/go.mod index 5fcd8c8e4da..45d45abf132 100644 --- a/arduino/discovery/discovery_client/go.mod +++ b/arduino/discovery/discovery_client/go.mod @@ -7,6 +7,7 @@ replace github.com/arduino/arduino-cli => ../../.. require ( github.com/arduino/arduino-cli v0.0.0-00010101000000-000000000000 github.com/gizak/termui/v3 v3.1.0 + github.com/sirupsen/logrus v1.4.2 ) require ( @@ -20,7 +21,6 @@ require ( github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d // indirect github.com/pkg/errors v0.9.1 // indirect github.com/rivo/uniseg v0.2.0 // indirect - github.com/sirupsen/logrus v1.4.2 // indirect golang.org/x/net v0.0.0-20210505024714-0287a6fb4125 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/text v0.3.6 // indirect diff --git a/arduino/discovery/discovery_client/main.go b/arduino/discovery/discovery_client/main.go index 2f033a604a3..b77fe6e7f68 100644 --- a/arduino/discovery/discovery_client/main.go +++ b/arduino/discovery/discovery_client/main.go @@ -21,36 +21,28 @@ import ( "log" "os" "sort" - "time" "github.com/arduino/arduino-cli/arduino/discovery" + "github.com/arduino/arduino-cli/arduino/discovery/discoverymanager" ui "github.com/gizak/termui/v3" "github.com/gizak/termui/v3/widgets" + "github.com/sirupsen/logrus" ) func main() { - discoveries := []*discovery.PluggableDiscovery{} - discEvent := make(chan *discovery.Event) + logrus.SetLevel(logrus.ErrorLevel) + dm := discoverymanager.New() for _, discCmd := range os.Args[1:] { - disc := discovery.New("", discCmd) - if err := disc.Run(); err != nil { - log.Fatal("Error starting discovery:", err) - } - if err := disc.Start(); err != nil { - log.Fatal("Error starting discovery:", err) - } - eventChan, err := disc.StartSync(10) - if err != nil { - log.Fatal("Error starting discovery:", err) - } - go func() { - for msg := range eventChan { - discEvent <- msg - } - }() - discoveries = append(discoveries, disc) + disc := discovery.New(discCmd, discCmd) + dm.Add(disc) } + dm.Start() + activePorts := map[string]*discovery.Port{} + watcher, err := dm.Watch() + if err != nil { + log.Fatalf("failed to start discoveries: %v", err) + } if err := ui.Init(); err != nil { log.Fatalf("failed to initialize termui: %v", err) } @@ -66,15 +58,20 @@ func main() { updateList := func() { rows := []string{} rows = append(rows, "Available ports list:") - for _, disc := range discoveries { - for i, port := range disc.ListCachedPorts() { - rows = append(rows, fmt.Sprintf(" [%04d] Address: %s", i, port.AddressLabel)) - rows = append(rows, fmt.Sprintf(" Protocol: %s", port.ProtocolLabel)) - keys := port.Properties.Keys() - sort.Strings(keys) - for _, k := range keys { - rows = append(rows, fmt.Sprintf(" %s=%s", k, port.Properties.Get(k))) - } + + ids := sort.StringSlice{} + for id := range activePorts { + ids = append(ids, id) + } + ids.Sort() + for _, id := range ids { + port := activePorts[id] + rows = append(rows, fmt.Sprintf("> Address: %s", port.AddressLabel)) + rows = append(rows, fmt.Sprintf(" Protocol: %s", port.ProtocolLabel)) + keys := port.Properties.Keys() + sort.Strings(keys) + for _, k := range keys { + rows = append(rows, fmt.Sprintf(" %s=%s", k, port.Properties.Get(k))) } } l.Rows = rows @@ -123,20 +120,16 @@ out: previousKey = e.ID } - case <-discEvent: + case ev := <-watcher.Feed(): + if ev.Type == "add" { + activePorts[ev.Port.Address+"|"+ev.Port.Protocol] = ev.Port + } + if ev.Type == "remove" { + delete(activePorts, ev.Port.Address+"|"+ev.Port.Protocol) + } updateList() } ui.Render(l) } - - for _, disc := range discoveries { - disc.Quit() - fmt.Println("Discovery QUITed") - for disc.State() == discovery.Alive { - time.Sleep(time.Millisecond) - } - fmt.Println("Discovery correctly terminated") - } - } diff --git a/arduino/discovery/discoverymanager/discoverymanager.go b/arduino/discovery/discoverymanager/discoverymanager.go index b18fbf46bbd..5cf3f2d3aa1 100644 --- a/arduino/discovery/discoverymanager/discoverymanager.go +++ b/arduino/discovery/discoverymanager/discoverymanager.go @@ -18,6 +18,7 @@ package discoverymanager import ( "fmt" "sync" + "time" "github.com/arduino/arduino-cli/arduino/discovery" "github.com/arduino/arduino-cli/i18n" @@ -25,11 +26,20 @@ import ( "github.com/sirupsen/logrus" ) -// DiscoveryManager is required to handle multiple pluggable-discovery that -// may be shared across platforms +// DiscoveryManager manages the many-to-many communication between all pluggable +// discoveries and all watchers. Each PluggableDiscovery, once started, will +// produce a sequence of "events". These events will be broadcasted to all +// listening Watcher. +// The DiscoveryManager will not start the discoveries until the Start method +// is called. type DiscoveryManager struct { - discoveriesMutex sync.Mutex - discoveries map[string]*discovery.PluggableDiscovery + discoveriesMutex sync.Mutex + discoveries map[string]*discovery.PluggableDiscovery // all registered PluggableDiscovery + discoveriesRunning bool // set to true once discoveries are started + feed chan *discovery.Event // all events will pass through this channel + watchersMutex sync.Mutex + watchers map[*PortWatcher]bool // all registered Watcher + watchersCache map[string]map[string]*discovery.Event // this is a cache of all active ports } var tr = i18n.Tr @@ -37,15 +47,24 @@ var tr = i18n.Tr // New creates a new DiscoveryManager func New() *DiscoveryManager { return &DiscoveryManager{ - discoveries: map[string]*discovery.PluggableDiscovery{}, + discoveries: map[string]*discovery.PluggableDiscovery{}, + watchers: map[*PortWatcher]bool{}, + feed: make(chan *discovery.Event, 50), + watchersCache: map[string]map[string]*discovery.Event{}, } } // Clear resets the DiscoveryManager to its initial state func (dm *DiscoveryManager) Clear() { - dm.QuitAll() dm.discoveriesMutex.Lock() defer dm.discoveriesMutex.Unlock() + + if dm.discoveriesRunning { + for _, d := range dm.discoveries { + d.Quit() + logrus.Infof("Closed and removed discovery %s", d.GetID()) + } + } dm.discoveries = map[string]*discovery.PluggableDiscovery{} } @@ -60,234 +79,200 @@ func (dm *DiscoveryManager) IDs() []string { return ids } -// Add adds a discovery to the list of managed discoveries -func (dm *DiscoveryManager) Add(disc *discovery.PluggableDiscovery) error { - id := disc.GetID() +// Start starts all the discoveries in this DiscoveryManager. +// If the discoveries are already running, this function does nothing. +func (dm *DiscoveryManager) Start() []error { dm.discoveriesMutex.Lock() defer dm.discoveriesMutex.Unlock() - if _, has := dm.discoveries[id]; has { - return errors.Errorf(tr("pluggable discovery already added: %s"), id) + if dm.discoveriesRunning { + return nil } - dm.discoveries[id] = disc - return nil -} -// remove quits and deletes the discovery with specified id -// from the discoveries managed by this DiscoveryManager -func (dm *DiscoveryManager) remove(id string) { - dm.discoveriesMutex.Lock() - d := dm.discoveries[id] - delete(dm.discoveries, id) - dm.discoveriesMutex.Unlock() - d.Quit() - logrus.Infof("Closed and removed discovery %s", id) -} + go func() { + // Send all events coming from the feed channel to all active watchers + for ev := range dm.feed { + dm.feedEvent(ev) + } + }() + + errs := []error{} + var errsLock sync.Mutex -// parallelize runs function f concurrently for each discovery. -// Returns a list of errors returned by each call of f. -func (dm *DiscoveryManager) parallelize(f func(d *discovery.PluggableDiscovery) error) []error { var wg sync.WaitGroup - errChan := make(chan error) - dm.discoveriesMutex.Lock() - discoveries := []*discovery.PluggableDiscovery{} for _, d := range dm.discoveries { - discoveries = append(discoveries, d) - } - dm.discoveriesMutex.Unlock() - for _, d := range discoveries { wg.Add(1) go func(d *discovery.PluggableDiscovery) { - defer wg.Done() - if err := f(d); err != nil { - errChan <- err + if err := dm.startDiscovery(d); err != nil { + errsLock.Lock() + errs = append(errs, err) + errsLock.Unlock() } + wg.Done() }(d) } + wg.Wait() + dm.discoveriesRunning = true - // Wait in a goroutine to collect eventual errors running a discovery. - // When all goroutines that are calling discoveries are done close the errors chan. - go func() { - wg.Wait() - close(errChan) - }() - - errs := []error{} - for err := range errChan { - errs = append(errs, err) - } return errs } -// RunAll the discoveries for this DiscoveryManager, -// returns an error for each discovery failing to run -func (dm *DiscoveryManager) RunAll() []error { - return dm.parallelize(func(d *discovery.PluggableDiscovery) error { - if d.State() != discovery.Dead { - // This discovery is already alive, nothing to do - return nil - } +// Add adds a discovery to the list of managed discoveries +func (dm *DiscoveryManager) Add(d *discovery.PluggableDiscovery) error { + dm.discoveriesMutex.Lock() + defer dm.discoveriesMutex.Unlock() - if err := d.Run(); err != nil { - dm.remove(d.GetID()) - return fmt.Errorf(tr("discovery %[1]s process not started: %[2]w"), d.GetID(), err) - } - return nil - }) + id := d.GetID() + if _, has := dm.discoveries[id]; has { + return errors.Errorf(tr("pluggable discovery already added: %s"), id) + } + dm.discoveries[id] = d + + if dm.discoveriesRunning { + dm.startDiscovery(d) + } + return nil } -// StartAll the discoveries for this DiscoveryManager, -// returns an error for each discovery failing to start -func (dm *DiscoveryManager) StartAll() []error { - return dm.parallelize(func(d *discovery.PluggableDiscovery) error { - state := d.State() - if state != discovery.Idling { - // Already started - return nil - } - if err := d.Start(); err != nil { - dm.remove(d.GetID()) - return fmt.Errorf(tr("starting discovery %[1]s: %[2]w"), d.GetID(), err) - } - return nil - }) +// PortWatcher is a watcher for all discovery events (port connection/disconnection) +type PortWatcher struct { + closeCB func() + feed chan *discovery.Event } -// StartSyncAll the discoveries for this DiscoveryManager, -// returns an error for each discovery failing to start syncing -func (dm *DiscoveryManager) StartSyncAll() (<-chan *discovery.Event, []error) { - eventSink := make(chan *discovery.Event, 5) - var wg sync.WaitGroup - errs := dm.parallelize(func(d *discovery.PluggableDiscovery) error { - state := d.State() - if state != discovery.Idling || state == discovery.Syncing { - // Already syncing - return nil - } +// Feed returns the feed of events coming from the discoveries +func (pw *PortWatcher) Feed() <-chan *discovery.Event { + return pw.feed +} - eventCh, err := d.StartSync(5) - if err != nil { - dm.remove(d.GetID()) - return fmt.Errorf(tr("start syncing discovery %[1]s: %[2]w"), d.GetID(), err) - } +// Close closes the PortWatcher +func (pw *PortWatcher) Close() { + pw.closeCB() +} - wg.Add(1) - go func() { - for ev := range eventCh { - eventSink <- ev - } - wg.Done() - }() - return nil - }) +// Watch starts a watcher for all discovery events (port connection/disconnection). +// The watcher must be closed when it is no longer needed with the Close method. +func (dm *DiscoveryManager) Watch() (*PortWatcher, error) { + dm.Start() + + watcher := &PortWatcher{ + feed: make(chan *discovery.Event, 10), + } + watcher.closeCB = func() { + dm.watchersMutex.Lock() + defer dm.watchersMutex.Unlock() + delete(dm.watchers, watcher) + close(watcher.feed) + } go func() { - wg.Wait() - eventSink <- &discovery.Event{Type: "quit"} - close(eventSink) + dm.watchersMutex.Lock() + // When a watcher is started, send all the current active ports first... + for _, cache := range dm.watchersCache { + for _, ev := range cache { + watcher.feed <- ev + } + } + // ...and after that add the watcher to the list of watchers receiving events + dm.watchers[watcher] = true + dm.watchersMutex.Unlock() }() - return eventSink, errs + return watcher, nil } -// StopAll the discoveries for this DiscoveryManager, -// returns an error for each discovery failing to stop -func (dm *DiscoveryManager) StopAll() []error { - return dm.parallelize(func(d *discovery.PluggableDiscovery) error { - state := d.State() - if state != discovery.Syncing && state != discovery.Running { - // Not running nor syncing, nothing to stop - return nil +func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (discErr error) { + defer func() { + // If this function returns an error log it + if discErr != nil { + logrus.Errorf("Discovery %s failed to run: %s", d.GetID(), discErr) } + }() - if err := d.Stop(); err != nil { - dm.remove(d.GetID()) - return fmt.Errorf(tr("stopping discovery %[1]s: %[2]w"), d.GetID(), err) - } - return nil - }) -} + if err := d.Run(); err != nil { + return fmt.Errorf(tr("discovery %[1]s process not started: %[2]w"), d.GetID(), err) + } + eventCh, err := d.StartSync(5) + if err != nil { + return fmt.Errorf("%s: %s", tr("starting discovery %s", d.GetID()), err) + } -// QuitAll quits all the discoveries managed by this DiscoveryManager. -// Returns an error for each discovery that fails quitting -func (dm *DiscoveryManager) QuitAll() []error { - errs := dm.parallelize(func(d *discovery.PluggableDiscovery) error { - if d.State() == discovery.Dead { - // Stop! Stop! It's already dead! - return nil + go func() { + // Transfer all incoming events from this discovery to the feed channel + for ev := range eventCh { + dm.feed <- ev } - - d.Quit() - return nil - }) - return errs + }() + return nil } -// List returns a list of available ports detected from all discoveries -// and a list of errors for those discoveries that returned one. -func (dm *DiscoveryManager) List() ([]*discovery.Port, []error) { - var wg sync.WaitGroup - // Use this struct to avoid the need of two separate - // channels for ports and errors. - type listMsg struct { - Err error - Port *discovery.Port - } - msgChan := make(chan listMsg) - dm.discoveriesMutex.Lock() - discoveries := []*discovery.PluggableDiscovery{} - for _, d := range dm.discoveries { - discoveries = append(discoveries, d) - } - dm.discoveriesMutex.Unlock() - for _, d := range discoveries { - wg.Add(1) - go func(d *discovery.PluggableDiscovery) { - defer wg.Done() - if d.State() != discovery.Running { - // Discovery is not running, it won't return anything - return - } - ports, err := d.List() - if err != nil { - msgChan <- listMsg{Err: fmt.Errorf(tr("listing ports from discovery %[1]s: %[2]w"), d.GetID(), err)} +func (dm *DiscoveryManager) feedEvent(ev *discovery.Event) { + dm.watchersMutex.Lock() + defer dm.watchersMutex.Unlock() + + sendToAllWatchers := func(ev *discovery.Event) { + // Send the event to all watchers + for watcher := range dm.watchers { + select { + case watcher.feed <- ev: + // OK + case <-time.After(time.Millisecond * 500): + // If the watcher is not able to process event fast enough + // remove the watcher from the list of watchers + logrus.Error("Watcher is not able to process events fast enough, removing it from the list of watchers") + delete(dm.watchers, watcher) } - for _, p := range ports { - msgChan <- listMsg{Port: p} + } + } + + if ev.Type == "stop" { + // Send remove events for all the cached ports of the terminating discovery + cache := dm.watchersCache[ev.DiscoveryID] + for _, addEv := range cache { + removeEv := &discovery.Event{ + Type: "remove", + Port: &discovery.Port{ + Address: addEv.Port.Address, + AddressLabel: addEv.Port.AddressLabel, + Protocol: addEv.Port.Protocol, + ProtocolLabel: addEv.Port.ProtocolLabel}, + DiscoveryID: addEv.DiscoveryID, } - }(d) + sendToAllWatchers(removeEv) + } + + // Remove the cache for the terminating discovery + delete(dm.watchersCache, ev.DiscoveryID) + return } - go func() { - // Close the channel only after all goroutines are done - wg.Wait() - close(msgChan) - }() + sendToAllWatchers(ev) - ports := []*discovery.Port{} - errs := []error{} - for msg := range msgChan { - if msg.Err != nil { - errs = append(errs, msg.Err) - } else { - ports = append(ports, msg.Port) - } + // Cache the event for the discovery + cache := dm.watchersCache[ev.DiscoveryID] + if cache == nil { + cache = map[string]*discovery.Event{} + dm.watchersCache[ev.DiscoveryID] = cache + } + eventID := ev.Port.Address + "|" + ev.Port.Protocol + switch ev.Type { + case "add": + cache[eventID] = ev + case "remove": + delete(cache, eventID) + default: + logrus.Errorf("Unhandled event from discovery: %s", ev.Type) } - return ports, errs } -// ListCachedPorts return the current list of ports detected from all discoveries -func (dm *DiscoveryManager) ListCachedPorts() []*discovery.Port { +// List return the current list of ports detected from all discoveries +func (dm *DiscoveryManager) List() []*discovery.Port { + dm.Start() + res := []*discovery.Port{} - dm.discoveriesMutex.Lock() - discoveries := []*discovery.PluggableDiscovery{} - for _, d := range dm.discoveries { - discoveries = append(discoveries, d) - } - dm.discoveriesMutex.Unlock() - for _, d := range discoveries { - if d.State() != discovery.Syncing { - // Discovery is not syncing - continue + dm.watchersMutex.Lock() + defer dm.watchersMutex.Unlock() + for _, cache := range dm.watchersCache { + for _, ev := range cache { + res = append(res, ev.Port) } - res = append(res, d.ListCachedPorts()...) } return res } diff --git a/cli/arguments/completion.go b/cli/arguments/completion.go index 34674e8e57b..32ab99f407b 100644 --- a/cli/arguments/completion.go +++ b/cli/arguments/completion.go @@ -178,7 +178,7 @@ func GetInstallableLibs() []string { func GetConnectedBoards() []string { inst := instance.CreateAndInit() - list, _ := board.List(&rpc.BoardListRequest{ + list, _, _ := board.List(&rpc.BoardListRequest{ Instance: inst, }) var res []string diff --git a/cli/arguments/port.go b/cli/arguments/port.go index 12e7e28f5c7..1e054e713b4 100644 --- a/cli/arguments/port.go +++ b/cli/arguments/port.go @@ -106,31 +106,16 @@ func (p *Port) GetPort(instance *rpc.Instance, sk *sketch.Sketch) (*discovery.Po return nil, errors.New("invalid instance") } dm := pm.DiscoveryManager() - if errs := dm.RunAll(); len(errs) == len(dm.IDs()) { - // All discoveries failed to run, we can't do anything - return nil, fmt.Errorf("%v", errs) - } else if len(errs) > 0 { - // If only some discoveries failed to run just tell the user and go on - for _, err := range errs { - feedback.Error(err) - } - } - eventChan, errs := dm.StartSyncAll() - if len(errs) > 0 { - return nil, fmt.Errorf("%v", errs) + watcher, err := dm.Watch() + if err != nil { + return nil, err } - - defer func() { - // Quit all discoveries at the end. - if errs := dm.QuitAll(); len(errs) > 0 { - logrus.Errorf("quitting discoveries when getting port metadata: %v", errs) - } - }() + defer watcher.Close() deadline := time.After(p.timeout.Get()) for { select { - case portEvent := <-eventChan: + case portEvent := <-watcher.Feed(): if portEvent.Type != "add" { continue } @@ -161,7 +146,7 @@ func (p *Port) GetSearchTimeout() time.Duration { // discovered Port object together with the FQBN. If the port does not match // exactly 1 board, func (p *Port) DetectFQBN(inst *rpc.Instance) (string, *rpc.Port) { - detectedPorts, err := board.List(&rpc.BoardListRequest{ + detectedPorts, _, err := board.List(&rpc.BoardListRequest{ Instance: inst, Timeout: p.timeout.Get().Milliseconds(), }) diff --git a/cli/board/list.go b/cli/board/list.go index 83f4e4418cd..fe82973847a 100644 --- a/cli/board/list.go +++ b/cli/board/list.go @@ -64,22 +64,26 @@ func runListCommand(cmd *cobra.Command, args []string) { os.Exit(0) } - ports, err := board.List(&rpc.BoardListRequest{ + ports, discvoeryErrors, err := board.List(&rpc.BoardListRequest{ Instance: inst, Timeout: timeoutArg.Get().Milliseconds(), }) if err != nil { feedback.Errorf(tr("Error detecting boards: %v"), err) } + for _, err := range discvoeryErrors { + feedback.Errorf(tr("Error starting discovery: %v"), err) + } feedback.PrintResult(result{ports}) } func watchList(cmd *cobra.Command, inst *rpc.Instance) { - eventsChan, err := board.Watch(inst.Id, nil) + eventsChan, closeCB, err := board.Watch(inst.Id) if err != nil { feedback.Errorf(tr("Error detecting boards: %v"), err) os.Exit(errorcodes.ErrNetwork) } + defer closeCB() // This is done to avoid printing the header each time a new event is received if feedback.GetFormat() == feedback.Text { diff --git a/commands/board/list.go b/commands/board/list.go index c2498889c10..fe9b4afeb09 100644 --- a/commands/board/list.go +++ b/commands/board/list.go @@ -16,6 +16,7 @@ package board import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -176,32 +177,21 @@ func identify(pm *packagemanager.PackageManager, port *discovery.Port) ([]*rpc.B // List returns a list of boards found by the loaded discoveries. // In case of errors partial results from discoveries that didn't fail // are returned. -func List(req *rpc.BoardListRequest) (r []*rpc.DetectedPort, e error) { +func List(req *rpc.BoardListRequest) (r []*rpc.DetectedPort, discoveryStartErrors []error, e error) { pm := commands.GetPackageManager(req.GetInstance().Id) if pm == nil { - return nil, &arduino.InvalidInstanceError{} + return nil, nil, &arduino.InvalidInstanceError{} } dm := pm.DiscoveryManager() - if errs := dm.RunAll(); len(errs) > 0 { - return nil, &arduino.UnavailableError{Message: tr("Error starting board discoveries"), Cause: fmt.Errorf("%v", errs)} - } - if errs := dm.StartAll(); len(errs) > 0 { - return nil, &arduino.UnavailableError{Message: tr("Error starting board discoveries"), Cause: fmt.Errorf("%v", errs)} - } - defer func() { - if errs := dm.StopAll(); len(errs) > 0 { - logrus.Error(errs) - } - }() + discoveryStartErrors = dm.Start() time.Sleep(time.Duration(req.GetTimeout()) * time.Millisecond) retVal := []*rpc.DetectedPort{} - ports, errs := pm.DiscoveryManager().List() - for _, port := range ports { + for _, port := range dm.List() { boards, err := identify(pm, port) if err != nil { - return nil, err + return nil, discoveryStartErrors, err } // boards slice can be empty at this point if neither the cores nor the @@ -212,92 +202,49 @@ func List(req *rpc.BoardListRequest) (r []*rpc.DetectedPort, e error) { } retVal = append(retVal, b) } - if len(errs) > 0 { - return retVal, &arduino.UnavailableError{Message: tr("Error getting board list"), Cause: fmt.Errorf("%v", errs)} - } - return retVal, nil + return retVal, discoveryStartErrors, nil } // Watch returns a channel that receives boards connection and disconnection events. -// The discovery process can be interrupted by sending a message to the interrupt channel. -func Watch(instanceID int32, interrupt <-chan bool) (<-chan *rpc.BoardListWatchResponse, error) { +// It also returns a callback function that must be used to stop and dispose the watch. +func Watch(instanceID int32) (<-chan *rpc.BoardListWatchResponse, func(), error) { pm := commands.GetPackageManager(instanceID) dm := pm.DiscoveryManager() - runErrs := dm.RunAll() - if len(runErrs) == len(dm.IDs()) { - // All discoveries failed to run, we can't do anything - return nil, &arduino.UnavailableError{Message: tr("Error starting board discoveries"), Cause: fmt.Errorf("%v", runErrs)} + watcher, err := dm.Watch() + if err != nil { + return nil, nil, err } - eventsChan, errs := dm.StartSyncAll() - if len(runErrs) > 0 { - errs = append(runErrs, errs...) - } + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-ctx.Done() + watcher.Close() + }() outChan := make(chan *rpc.BoardListWatchResponse) - go func() { defer close(outChan) - for _, err := range errs { - outChan <- &rpc.BoardListWatchResponse{ - EventType: "error", - Error: err.Error(), + for event := range watcher.Feed() { + port := &rpc.DetectedPort{ + Port: event.Port.ToRPC(), } - } - for { - select { - case event := <-eventsChan: - if event.Type == "quit" { - // The discovery manager has closed its event channel because it's - // quitting all the discovery processes that are running, this - // means that the events channel we're listening from won't receive any - // more events. - // Handling this case is necessary when the board watcher is running and - // the instance being used is reinitialized since that quits all the - // discovery processes and reset the discovery manager. That would leave - // this goroutine listening forever on a "dead" channel and might even - // cause panics. - // This message avoid all this issues. - // It will be the client's task restarting the board watcher if necessary, - // this host won't attempt restarting it. - outChan <- &rpc.BoardListWatchResponse{ - EventType: event.Type, - } - return - } - - port := &rpc.DetectedPort{ - Port: event.Port.ToRPC(), - } - boardsError := "" - if event.Type == "add" { - boards, err := identify(pm, event.Port) - if err != nil { - boardsError = err.Error() - } - port.MatchingBoards = boards - } - outChan <- &rpc.BoardListWatchResponse{ - EventType: event.Type, - Port: port, - Error: boardsError, - } - case <-interrupt: - for _, err := range dm.StopAll() { - // Discoveries that return errors have their process - // closed and are removed from the list of discoveries - // in the manager - outChan <- &rpc.BoardListWatchResponse{ - EventType: "error", - Error: tr("stopping discoveries: %s", err), - } + boardsError := "" + if event.Type == "add" { + boards, err := identify(pm, event.Port) + if err != nil { + boardsError = err.Error() } - return + port.MatchingBoards = boards + } + outChan <- &rpc.BoardListWatchResponse{ + EventType: event.Type, + Port: port, + Error: boardsError, } } }() - return outChan, nil + return outChan, cancel, nil } diff --git a/commands/daemon/daemon.go b/commands/daemon/daemon.go index a82656bbe70..96ec5daed6e 100644 --- a/commands/daemon/daemon.go +++ b/commands/daemon/daemon.go @@ -36,9 +36,7 @@ import ( "github.com/arduino/arduino-cli/i18n" rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1" "github.com/sirupsen/logrus" - "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" ) // ArduinoCoreServerImpl FIXMEDOC @@ -69,7 +67,7 @@ func (s *ArduinoCoreServerImpl) BoardDetails(ctx context.Context, req *rpc.Board // BoardList FIXMEDOC func (s *ArduinoCoreServerImpl) BoardList(ctx context.Context, req *rpc.BoardListRequest) (*rpc.BoardListResponse, error) { - ports, err := board.List(req) + ports, _, err := board.List(req) if err != nil { return nil, convertErrorToRPCStatus(err) } @@ -109,42 +107,35 @@ func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_Boa return err } - interrupt := make(chan bool, 1) + eventsChan, closeWatcher, err := board.Watch(msg.Instance.Id) + if err != nil { + return convertErrorToRPCStatus(err) + } + go func() { - defer close(interrupt) + defer closeWatcher() for { msg, err := stream.Recv() // Handle client closing the stream and eventual errors if err == io.EOF { logrus.Info("boards watcher stream closed") - interrupt <- true - return - } else if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled { - logrus.Info("boards watcher interrupted by host") return - } else if err != nil { + } + if err != nil { logrus.Infof("interrupting boards watcher: %v", err) - interrupt <- true return } // Message received, does the client want to interrupt? if msg != nil && msg.Interrupt { logrus.Info("boards watcher interrupted by client") - interrupt <- msg.Interrupt return } } }() - eventsChan, err := board.Watch(msg.Instance.Id, interrupt) - if err != nil { - return convertErrorToRPCStatus(err) - } - for event := range eventsChan { - err = stream.Send(event) - if err != nil { + if err := stream.Send(event); err != nil { logrus.Infof("sending board watch message: %v", err) } }