Skip to content

Commit

Permalink
Fix gRPC BoardList* methods concurrency issues (#1804)
Browse files Browse the repository at this point in the history
* Improved streaming of pluggable-discoveries events (WIP)

Now the DiscoveryManager is able to start the discoveries and add/remove
them in a thread-safe way. Also the watchers may connect and disconnect
seamlessly at any time, the incoming events from the discovery are
broadcasted correctly to each active watcher.

This refactoring dramatically simplifies the DiscoveryManager design.

* Added discovery id in discovery.Event struct

* Cache active ports and transmit them when a new watcher connects

* Correctly handle discovery cleanup

* Fixed wrong test

* Correctly handle discovery cleanup and re-add

* Added some doc comments in the source code

* Move Unlock under defer

* Factored subrotuine into a function

it will be useful in the next commits.

* Do not cache ports in the DiscoveryClient

there is already a cache in the DiscoveryManager there is no need to
duplicate it.

* Discovery: eventChan must be protected by mutex when doing START_SYNC

otherwise the discovery may send some events before the eventChan is
setup (and those events will be lost)

* Increased error level for logging watchers that lags

* Updated discvoery_client to the latest API

* Report discovery start errors

* Update arduino/discovery/discovery_client/main.go

Co-authored-by: Umberto Baldi <[email protected]>

Co-authored-by: Umberto Baldi <[email protected]>
  • Loading branch information
cmaglie and umbynos authored Aug 10, 2022
1 parent 312cfdb commit 9c334ed
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 404 deletions.
14 changes: 6 additions & 8 deletions arduino/cores/packagemanager/package_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,14 @@ func TestPackageManagerClear(t *testing.T) {
packageManager := packagemanager.NewPackageManager(customHardware, customHardware, customHardware, customHardware, "test")
packageManager.LoadHardwareFromDirectory(customHardware)

// Creates another PackageManager but don't load the hardware
emptyPackageManager := packagemanager.NewPackageManager(customHardware, customHardware, customHardware, customHardware, "test")
// Check that the hardware is loaded
require.NotEmpty(t, packageManager.Packages)

// Verifies they're not equal
require.NotEqual(t, packageManager, emptyPackageManager)

// Clear the first PackageManager that contains loaded hardware
// Clear the package manager
packageManager.Clear()
// Verifies both PackageManagers are now equal
require.Equal(t, packageManager, emptyPackageManager)

// Check that the hardware is cleared
require.Empty(t, packageManager.Packages)
}

func TestFindToolsRequiredFromPlatformRelease(t *testing.T) {
Expand Down
54 changes: 18 additions & 36 deletions arduino/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type PluggableDiscovery struct {
incomingMessagesError error
state int
eventChan chan<- *Event
cachedPorts map[string]*Port
}

type discoveryMessage struct {
Expand Down Expand Up @@ -121,8 +120,9 @@ func (p *Port) String() string {

// Event is a pluggable discovery event
type Event struct {
Type string
Port *Port
Type string
Port *Port
DiscoveryID string
}

// New create and connect to the given pluggable discovery
Expand All @@ -131,7 +131,6 @@ func New(id string, args ...string) *PluggableDiscovery {
id: id,
processArgs: args,
state: Dead,
cachedPorts: map[string]*Port{},
}
}

Expand Down Expand Up @@ -176,9 +175,8 @@ func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *dis
return
}
disc.statusMutex.Lock()
disc.cachedPorts[msg.Port.Address+"|"+msg.Port.Protocol] = msg.Port
if disc.eventChan != nil {
disc.eventChan <- &Event{"add", msg.Port}
disc.eventChan <- &Event{"add", msg.Port, disc.GetID()}
}
disc.statusMutex.Unlock()
} else if msg.EventType == "remove" {
Expand All @@ -187,9 +185,8 @@ func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *dis
return
}
disc.statusMutex.Lock()
delete(disc.cachedPorts, msg.Port.Address+"|"+msg.Port.Protocol)
if disc.eventChan != nil {
disc.eventChan <- &Event{"remove", msg.Port}
disc.eventChan <- &Event{"remove", msg.Port, disc.GetID()}
}
disc.statusMutex.Unlock()
} else {
Expand Down Expand Up @@ -276,10 +273,7 @@ func (disc *PluggableDiscovery) killProcess() error {
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
if disc.eventChan != nil {
close(disc.eventChan)
disc.eventChan = nil
}
disc.stopSync()
disc.state = Dead
logrus.Infof("killed discovery %s process", disc.id)
return nil
Expand Down Expand Up @@ -366,13 +360,17 @@ func (disc *PluggableDiscovery) Stop() error {
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
disc.cachedPorts = map[string]*Port{}
disc.stopSync()
disc.state = Idling
return nil
}

func (disc *PluggableDiscovery) stopSync() {
if disc.eventChan != nil {
disc.eventChan <- &Event{"stop", nil, disc.GetID()}
close(disc.eventChan)
disc.eventChan = nil
}
disc.state = Idling
return nil
}

// Quit terminates the discovery. No more commands can be accepted by the discovery.
Expand Down Expand Up @@ -409,6 +407,9 @@ func (disc *PluggableDiscovery) List() ([]*Port, error) {
// The event channel must be consumed as quickly as possible since it may block the
// discovery if it becomes full. The channel size is configurable.
func (disc *PluggableDiscovery) StartSync(size int) (<-chan *Event, error) {
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()

if err := disc.sendCommand("START_SYNC\n"); err != nil {
return nil, err
}
Expand All @@ -423,29 +424,10 @@ func (disc *PluggableDiscovery) StartSync(size int) (<-chan *Event, error) {
return nil, errors.Errorf(tr("communication out of sync, expected '%[1]s', received '%[2]s'"), "OK", msg.Message)
}

disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
disc.state = Syncing
disc.cachedPorts = map[string]*Port{}
if disc.eventChan != nil {
// In case there is already an existing event channel in use we close it
// before creating a new one.
close(disc.eventChan)
}
// In case there is already an existing event channel in use we close it before creating a new one.
disc.stopSync()
c := make(chan *Event, size)
disc.eventChan = c
return c, nil
}

// ListCachedPorts returns a list of the available ports. The list is a cache of all the
// add/remove events happened from the StartSync call and it will not consume any
// resource from the underliying discovery.
func (disc *PluggableDiscovery) ListCachedPorts() []*Port {
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
res := []*Port{}
for _, port := range disc.cachedPorts {
res = append(res, port)
}
return res
}
2 changes: 1 addition & 1 deletion arduino/discovery/discovery_client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ replace github.com/arduino/arduino-cli => ../../..
require (
github.com/arduino/arduino-cli v0.0.0-00010101000000-000000000000
github.com/gizak/termui/v3 v3.1.0
github.com/sirupsen/logrus v1.4.2
)

require (
Expand All @@ -20,7 +21,6 @@ require (
github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
golang.org/x/net v0.0.0-20210505024714-0287a6fb4125 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.6 // indirect
Expand Down
73 changes: 33 additions & 40 deletions arduino/discovery/discovery_client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,28 @@ import (
"log"
"os"
"sort"
"time"

"github.com/arduino/arduino-cli/arduino/discovery"
"github.com/arduino/arduino-cli/arduino/discovery/discoverymanager"
ui "github.com/gizak/termui/v3"
"github.com/gizak/termui/v3/widgets"
"github.com/sirupsen/logrus"
)

func main() {
discoveries := []*discovery.PluggableDiscovery{}
discEvent := make(chan *discovery.Event)
logrus.SetLevel(logrus.ErrorLevel)
dm := discoverymanager.New()
for _, discCmd := range os.Args[1:] {
disc := discovery.New("", discCmd)
if err := disc.Run(); err != nil {
log.Fatal("Error starting discovery:", err)
}
if err := disc.Start(); err != nil {
log.Fatal("Error starting discovery:", err)
}
eventChan, err := disc.StartSync(10)
if err != nil {
log.Fatal("Error starting discovery:", err)
}
go func() {
for msg := range eventChan {
discEvent <- msg
}
}()
discoveries = append(discoveries, disc)
disc := discovery.New(discCmd, discCmd)
dm.Add(disc)
}
dm.Start()

activePorts := map[string]*discovery.Port{}
watcher, err := dm.Watch()
if err != nil {
log.Fatalf("failed to start discoveries: %v", err)
}
if err := ui.Init(); err != nil {
log.Fatalf("failed to initialize termui: %v", err)
}
Expand All @@ -66,15 +58,20 @@ func main() {
updateList := func() {
rows := []string{}
rows = append(rows, "Available ports list:")
for _, disc := range discoveries {
for i, port := range disc.ListCachedPorts() {
rows = append(rows, fmt.Sprintf(" [%04d] Address: %s", i, port.AddressLabel))
rows = append(rows, fmt.Sprintf(" Protocol: %s", port.ProtocolLabel))
keys := port.Properties.Keys()
sort.Strings(keys)
for _, k := range keys {
rows = append(rows, fmt.Sprintf(" %s=%s", k, port.Properties.Get(k)))
}

ids := sort.StringSlice{}
for id := range activePorts {
ids = append(ids, id)
}
ids.Sort()
for _, id := range ids {
port := activePorts[id]
rows = append(rows, fmt.Sprintf("> Address: %s", port.AddressLabel))
rows = append(rows, fmt.Sprintf(" Protocol: %s", port.ProtocolLabel))
keys := port.Properties.Keys()
sort.Strings(keys)
for _, k := range keys {
rows = append(rows, fmt.Sprintf(" %s=%s", k, port.Properties.Get(k)))
}
}
l.Rows = rows
Expand Down Expand Up @@ -123,20 +120,16 @@ out:
previousKey = e.ID
}

case <-discEvent:
case ev := <-watcher.Feed():
if ev.Type == "add" {
activePorts[ev.Port.Address+"|"+ev.Port.Protocol] = ev.Port
}
if ev.Type == "remove" {
delete(activePorts, ev.Port.Address+"|"+ev.Port.Protocol)
}
updateList()
}

ui.Render(l)
}

for _, disc := range discoveries {
disc.Quit()
fmt.Println("Discovery QUITed")
for disc.State() == discovery.Alive {
time.Sleep(time.Millisecond)
}
fmt.Println("Discovery correctly terminated")
}

}
Loading

0 comments on commit 9c334ed

Please sign in to comment.