Unverified Commit 9c334ed8 authored by Cristian Maglie's avatar Cristian Maglie Committed by GitHub

Fix gRPC `BoardList*` methods concurrency issues (#1804)

* Improved streaming of pluggable-discoveries events (WIP)

Now the DiscoveryManager is able to start the discoveries and add/remove
them in a thread-safe way. Also the watchers may connect and disconnect
seamlessly at any time, the incoming events from the discovery are
broadcasted correctly to each active watcher.

This refactoring dramatically simplifies the DiscoveryManager design.

* Added discovery id in discovery.Event struct

* Cache active ports and transmit them when a new watcher connects

* Correctly handle discovery cleanup

* Fixed wrong test

* Correctly handle discovery cleanup and re-add

* Added some doc comments in the source code

* Move Unlock under defer

* Factored subrotuine into a function

it will be useful in the next commits.

* Do not cache ports in the DiscoveryClient

there is already a cache in the DiscoveryManager there is no need to
duplicate it.

* Discovery: eventChan must be protected by mutex when doing START_SYNC

otherwise the discovery may send some events before the eventChan is
setup (and those events will be lost)

* Increased error level for logging watchers that lags

* Updated discvoery_client to the latest API

* Report discovery start errors

* Update arduino/discovery/discovery_client/main.go
Co-authored-by: default avatarUmberto Baldi <34278123+umbynos@users.noreply.github.com>
Co-authored-by: default avatarUmberto Baldi <34278123+umbynos@users.noreply.github.com>
parent 312cfdb9
......@@ -329,16 +329,14 @@ func TestPackageManagerClear(t *testing.T) {
packageManager := packagemanager.NewPackageManager(customHardware, customHardware, customHardware, customHardware, "test")
packageManager.LoadHardwareFromDirectory(customHardware)
// Creates another PackageManager but don't load the hardware
emptyPackageManager := packagemanager.NewPackageManager(customHardware, customHardware, customHardware, customHardware, "test")
// Check that the hardware is loaded
require.NotEmpty(t, packageManager.Packages)
// Verifies they're not equal
require.NotEqual(t, packageManager, emptyPackageManager)
// Clear the first PackageManager that contains loaded hardware
// Clear the package manager
packageManager.Clear()
// Verifies both PackageManagers are now equal
require.Equal(t, packageManager, emptyPackageManager)
// Check that the hardware is cleared
require.Empty(t, packageManager.Packages)
}
func TestFindToolsRequiredFromPlatformRelease(t *testing.T) {
......
......@@ -57,7 +57,6 @@ type PluggableDiscovery struct {
incomingMessagesError error
state int
eventChan chan<- *Event
cachedPorts map[string]*Port
}
type discoveryMessage struct {
......@@ -121,8 +120,9 @@ func (p *Port) String() string {
// Event is a pluggable discovery event
type Event struct {
Type string
Port *Port
Type string
Port *Port
DiscoveryID string
}
// New create and connect to the given pluggable discovery
......@@ -131,7 +131,6 @@ func New(id string, args ...string) *PluggableDiscovery {
id: id,
processArgs: args,
state: Dead,
cachedPorts: map[string]*Port{},
}
}
......@@ -176,9 +175,8 @@ func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *dis
return
}
disc.statusMutex.Lock()
disc.cachedPorts[msg.Port.Address+"|"+msg.Port.Protocol] = msg.Port
if disc.eventChan != nil {
disc.eventChan <- &Event{"add", msg.Port}
disc.eventChan <- &Event{"add", msg.Port, disc.GetID()}
}
disc.statusMutex.Unlock()
} else if msg.EventType == "remove" {
......@@ -187,9 +185,8 @@ func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *dis
return
}
disc.statusMutex.Lock()
delete(disc.cachedPorts, msg.Port.Address+"|"+msg.Port.Protocol)
if disc.eventChan != nil {
disc.eventChan <- &Event{"remove", msg.Port}
disc.eventChan <- &Event{"remove", msg.Port, disc.GetID()}
}
disc.statusMutex.Unlock()
} else {
......@@ -276,10 +273,7 @@ func (disc *PluggableDiscovery) killProcess() error {
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
if disc.eventChan != nil {
close(disc.eventChan)
disc.eventChan = nil
}
disc.stopSync()
disc.state = Dead
logrus.Infof("killed discovery %s process", disc.id)
return nil
......@@ -366,13 +360,17 @@ func (disc *PluggableDiscovery) Stop() error {
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
disc.cachedPorts = map[string]*Port{}
disc.stopSync()
disc.state = Idling
return nil
}
func (disc *PluggableDiscovery) stopSync() {
if disc.eventChan != nil {
disc.eventChan <- &Event{"stop", nil, disc.GetID()}
close(disc.eventChan)
disc.eventChan = nil
}
disc.state = Idling
return nil
}
// Quit terminates the discovery. No more commands can be accepted by the discovery.
......@@ -409,6 +407,9 @@ func (disc *PluggableDiscovery) List() ([]*Port, error) {
// The event channel must be consumed as quickly as possible since it may block the
// discovery if it becomes full. The channel size is configurable.
func (disc *PluggableDiscovery) StartSync(size int) (<-chan *Event, error) {
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
if err := disc.sendCommand("START_SYNC\n"); err != nil {
return nil, err
}
......@@ -423,29 +424,10 @@ func (disc *PluggableDiscovery) StartSync(size int) (<-chan *Event, error) {
return nil, errors.Errorf(tr("communication out of sync, expected '%[1]s', received '%[2]s'"), "OK", msg.Message)
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
disc.state = Syncing
disc.cachedPorts = map[string]*Port{}
if disc.eventChan != nil {
// In case there is already an existing event channel in use we close it
// before creating a new one.
close(disc.eventChan)
}
// In case there is already an existing event channel in use we close it before creating a new one.
disc.stopSync()
c := make(chan *Event, size)
disc.eventChan = c
return c, nil
}
// ListCachedPorts returns a list of the available ports. The list is a cache of all the
// add/remove events happened from the StartSync call and it will not consume any
// resource from the underliying discovery.
func (disc *PluggableDiscovery) ListCachedPorts() []*Port {
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
res := []*Port{}
for _, port := range disc.cachedPorts {
res = append(res, port)
}
return res
}
......@@ -7,6 +7,7 @@ replace github.com/arduino/arduino-cli => ../../..
require (
github.com/arduino/arduino-cli v0.0.0-00010101000000-000000000000
github.com/gizak/termui/v3 v3.1.0
github.com/sirupsen/logrus v1.4.2
)
require (
......@@ -20,7 +21,6 @@ require (
github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
golang.org/x/net v0.0.0-20210505024714-0287a6fb4125 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.6 // indirect
......
......@@ -21,36 +21,28 @@ import (
"log"
"os"
"sort"
"time"
"github.com/arduino/arduino-cli/arduino/discovery"
"github.com/arduino/arduino-cli/arduino/discovery/discoverymanager"
ui "github.com/gizak/termui/v3"
"github.com/gizak/termui/v3/widgets"
"github.com/sirupsen/logrus"
)
func main() {
discoveries := []*discovery.PluggableDiscovery{}
discEvent := make(chan *discovery.Event)
logrus.SetLevel(logrus.ErrorLevel)
dm := discoverymanager.New()
for _, discCmd := range os.Args[1:] {
disc := discovery.New("", discCmd)
if err := disc.Run(); err != nil {
log.Fatal("Error starting discovery:", err)
}
if err := disc.Start(); err != nil {
log.Fatal("Error starting discovery:", err)
}
eventChan, err := disc.StartSync(10)
if err != nil {
log.Fatal("Error starting discovery:", err)
}
go func() {
for msg := range eventChan {
discEvent <- msg
}
}()
discoveries = append(discoveries, disc)
disc := discovery.New(discCmd, discCmd)
dm.Add(disc)
}
dm.Start()
activePorts := map[string]*discovery.Port{}
watcher, err := dm.Watch()
if err != nil {
log.Fatalf("failed to start discoveries: %v", err)
}
if err := ui.Init(); err != nil {
log.Fatalf("failed to initialize termui: %v", err)
}
......@@ -66,15 +58,20 @@ func main() {
updateList := func() {
rows := []string{}
rows = append(rows, "Available ports list:")
for _, disc := range discoveries {
for i, port := range disc.ListCachedPorts() {
rows = append(rows, fmt.Sprintf(" [%04d] Address: %s", i, port.AddressLabel))
rows = append(rows, fmt.Sprintf(" Protocol: %s", port.ProtocolLabel))
keys := port.Properties.Keys()
sort.Strings(keys)
for _, k := range keys {
rows = append(rows, fmt.Sprintf(" %s=%s", k, port.Properties.Get(k)))
}
ids := sort.StringSlice{}
for id := range activePorts {
ids = append(ids, id)
}
ids.Sort()
for _, id := range ids {
port := activePorts[id]
rows = append(rows, fmt.Sprintf("> Address: %s", port.AddressLabel))
rows = append(rows, fmt.Sprintf(" Protocol: %s", port.ProtocolLabel))
keys := port.Properties.Keys()
sort.Strings(keys)
for _, k := range keys {
rows = append(rows, fmt.Sprintf(" %s=%s", k, port.Properties.Get(k)))
}
}
l.Rows = rows
......@@ -123,20 +120,16 @@ out:
previousKey = e.ID
}
case <-discEvent:
case ev := <-watcher.Feed():
if ev.Type == "add" {
activePorts[ev.Port.Address+"|"+ev.Port.Protocol] = ev.Port
}
if ev.Type == "remove" {
delete(activePorts, ev.Port.Address+"|"+ev.Port.Protocol)
}
updateList()
}
ui.Render(l)
}
for _, disc := range discoveries {
disc.Quit()
fmt.Println("Discovery QUITed")
for disc.State() == discovery.Alive {
time.Sleep(time.Millisecond)
}
fmt.Println("Discovery correctly terminated")
}
}
......@@ -178,7 +178,7 @@ func GetInstallableLibs() []string {
func GetConnectedBoards() []string {
inst := instance.CreateAndInit()
list, _ := board.List(&rpc.BoardListRequest{
list, _, _ := board.List(&rpc.BoardListRequest{
Instance: inst,
})
var res []string
......
......@@ -106,31 +106,16 @@ func (p *Port) GetPort(instance *rpc.Instance, sk *sketch.Sketch) (*discovery.Po
return nil, errors.New("invalid instance")
}
dm := pm.DiscoveryManager()
if errs := dm.RunAll(); len(errs) == len(dm.IDs()) {
// All discoveries failed to run, we can't do anything
return nil, fmt.Errorf("%v", errs)
} else if len(errs) > 0 {
// If only some discoveries failed to run just tell the user and go on
for _, err := range errs {
feedback.Error(err)
}
}
eventChan, errs := dm.StartSyncAll()
if len(errs) > 0 {
return nil, fmt.Errorf("%v", errs)
watcher, err := dm.Watch()
if err != nil {
return nil, err
}
defer func() {
// Quit all discoveries at the end.
if errs := dm.QuitAll(); len(errs) > 0 {
logrus.Errorf("quitting discoveries when getting port metadata: %v", errs)
}
}()
defer watcher.Close()
deadline := time.After(p.timeout.Get())
for {
select {
case portEvent := <-eventChan:
case portEvent := <-watcher.Feed():
if portEvent.Type != "add" {
continue
}
......@@ -161,7 +146,7 @@ func (p *Port) GetSearchTimeout() time.Duration {
// discovered Port object together with the FQBN. If the port does not match
// exactly 1 board,
func (p *Port) DetectFQBN(inst *rpc.Instance) (string, *rpc.Port) {
detectedPorts, err := board.List(&rpc.BoardListRequest{
detectedPorts, _, err := board.List(&rpc.BoardListRequest{
Instance: inst,
Timeout: p.timeout.Get().Milliseconds(),
})
......
......@@ -64,22 +64,26 @@ func runListCommand(cmd *cobra.Command, args []string) {
os.Exit(0)
}
ports, err := board.List(&rpc.BoardListRequest{
ports, discvoeryErrors, err := board.List(&rpc.BoardListRequest{
Instance: inst,
Timeout: timeoutArg.Get().Milliseconds(),
})
if err != nil {
feedback.Errorf(tr("Error detecting boards: %v"), err)
}
for _, err := range discvoeryErrors {
feedback.Errorf(tr("Error starting discovery: %v"), err)
}
feedback.PrintResult(result{ports})
}
func watchList(cmd *cobra.Command, inst *rpc.Instance) {
eventsChan, err := board.Watch(inst.Id, nil)
eventsChan, closeCB, err := board.Watch(inst.Id)
if err != nil {
feedback.Errorf(tr("Error detecting boards: %v"), err)
os.Exit(errorcodes.ErrNetwork)
}
defer closeCB()
// This is done to avoid printing the header each time a new event is received
if feedback.GetFormat() == feedback.Text {
......
......@@ -16,6 +16,7 @@
package board
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
......@@ -176,32 +177,21 @@ func identify(pm *packagemanager.PackageManager, port *discovery.Port) ([]*rpc.B
// List returns a list of boards found by the loaded discoveries.
// In case of errors partial results from discoveries that didn't fail
// are returned.
func List(req *rpc.BoardListRequest) (r []*rpc.DetectedPort, e error) {
func List(req *rpc.BoardListRequest) (r []*rpc.DetectedPort, discoveryStartErrors []error, e error) {
pm := commands.GetPackageManager(req.GetInstance().Id)
if pm == nil {
return nil, &arduino.InvalidInstanceError{}
return nil, nil, &arduino.InvalidInstanceError{}
}
dm := pm.DiscoveryManager()
if errs := dm.RunAll(); len(errs) > 0 {
return nil, &arduino.UnavailableError{Message: tr("Error starting board discoveries"), Cause: fmt.Errorf("%v", errs)}
}
if errs := dm.StartAll(); len(errs) > 0 {
return nil, &arduino.UnavailableError{Message: tr("Error starting board discoveries"), Cause: fmt.Errorf("%v", errs)}
}
defer func() {
if errs := dm.StopAll(); len(errs) > 0 {
logrus.Error(errs)
}
}()
discoveryStartErrors = dm.Start()
time.Sleep(time.Duration(req.GetTimeout()) * time.Millisecond)
retVal := []*rpc.DetectedPort{}
ports, errs := pm.DiscoveryManager().List()
for _, port := range ports {
for _, port := range dm.List() {
boards, err := identify(pm, port)
if err != nil {
return nil, err
return nil, discoveryStartErrors, err
}
// boards slice can be empty at this point if neither the cores nor the
......@@ -212,92 +202,49 @@ func List(req *rpc.BoardListRequest) (r []*rpc.DetectedPort, e error) {
}
retVal = append(retVal, b)
}
if len(errs) > 0 {
return retVal, &arduino.UnavailableError{Message: tr("Error getting board list"), Cause: fmt.Errorf("%v", errs)}
}
return retVal, nil
return retVal, discoveryStartErrors, nil
}
// Watch returns a channel that receives boards connection and disconnection events.
// The discovery process can be interrupted by sending a message to the interrupt channel.
func Watch(instanceID int32, interrupt <-chan bool) (<-chan *rpc.BoardListWatchResponse, error) {
// It also returns a callback function that must be used to stop and dispose the watch.
func Watch(instanceID int32) (<-chan *rpc.BoardListWatchResponse, func(), error) {
pm := commands.GetPackageManager(instanceID)
dm := pm.DiscoveryManager()
runErrs := dm.RunAll()
if len(runErrs) == len(dm.IDs()) {
// All discoveries failed to run, we can't do anything
return nil, &arduino.UnavailableError{Message: tr("Error starting board discoveries"), Cause: fmt.Errorf("%v", runErrs)}
watcher, err := dm.Watch()
if err != nil {
return nil, nil, err
}
eventsChan, errs := dm.StartSyncAll()
if len(runErrs) > 0 {
errs = append(runErrs, errs...)
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
watcher.Close()
}()
outChan := make(chan *rpc.BoardListWatchResponse)
go func() {
defer close(outChan)
for _, err := range errs {
outChan <- &rpc.BoardListWatchResponse{
EventType: "error",
Error: err.Error(),
for event := range watcher.Feed() {
port := &rpc.DetectedPort{
Port: event.Port.ToRPC(),
}
}
for {
select {
case event := <-eventsChan:
if event.Type == "quit" {
// The discovery manager has closed its event channel because it's
// quitting all the discovery processes that are running, this
// means that the events channel we're listening from won't receive any
// more events.
// Handling this case is necessary when the board watcher is running and
// the instance being used is reinitialized since that quits all the
// discovery processes and reset the discovery manager. That would leave
// this goroutine listening forever on a "dead" channel and might even
// cause panics.
// This message avoid all this issues.
// It will be the client's task restarting the board watcher if necessary,
// this host won't attempt restarting it.
outChan <- &rpc.BoardListWatchResponse{
EventType: event.Type,
}
return
}
port := &rpc.DetectedPort{
Port: event.Port.ToRPC(),
}
boardsError := ""
if event.Type == "add" {
boards, err := identify(pm, event.Port)
if err != nil {
boardsError = err.Error()
}
port.MatchingBoards = boards
}
outChan <- &rpc.BoardListWatchResponse{
EventType: event.Type,
Port: port,
Error: boardsError,
}
case <-interrupt:
for _, err := range dm.StopAll() {
// Discoveries that return errors have their process
// closed and are removed from the list of discoveries
// in the manager
outChan <- &rpc.BoardListWatchResponse{
EventType: "error",
Error: tr("stopping discoveries: %s", err),
}
boardsError := ""
if event.Type == "add" {
boards, err := identify(pm, event.Port)
if err != nil {
boardsError = err.Error()
}
return
port.MatchingBoards = boards
}
outChan <- &rpc.BoardListWatchResponse{
EventType: event.Type,
Port: port,
Error: boardsError,
}
}
}()
return outChan, nil
return outChan, cancel, nil
}
......@@ -36,9 +36,7 @@ import (
"github.com/arduino/arduino-cli/i18n"
rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// ArduinoCoreServerImpl FIXMEDOC
......@@ -69,7 +67,7 @@ func (s *ArduinoCoreServerImpl) BoardDetails(ctx context.Context, req *rpc.Board
// BoardList FIXMEDOC
func (s *ArduinoCoreServerImpl) BoardList(ctx context.Context, req *rpc.BoardListRequest) (*rpc.BoardListResponse, error) {
ports, err := board.List(req)
ports, _, err := board.List(req)
if err != nil {
return nil, convertErrorToRPCStatus(err)
}
......@@ -109,42 +107,35 @@ func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_Boa
return err
}
interrupt := make(chan bool, 1)
eventsChan, closeWatcher, err := board.Watch(msg.Instance.Id)
if err != nil {
return convertErrorToRPCStatus(err)
}
go func() {
defer close(interrupt)
defer closeWatcher()
for {
msg, err := stream.Recv()
// Handle client closing the stream and eventual errors
if err == io.EOF {
logrus.Info("boards watcher stream closed")
interrupt <- true
return
} else if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled {
logrus.Info("boards watcher interrupted by host")
return
} else if err != nil {
}
if err != nil {
logrus.Infof("interrupting boards watcher: %v", err)
interrupt <- true
return
}
// Message received, does the client want to interrupt?
if msg != nil && msg.Interrupt {
logrus.Info("boards watcher interrupted by client")
interrupt <- msg.Interrupt
return
}
}
}()
eventsChan, err := board.Watch(msg.Instance.Id, interrupt)
if err != nil {
return convertErrorToRPCStatus(err)
}
for event := range eventsChan {
err = stream.Send(event)
if err != nil {
if err := stream.Send(event); err != nil {
logrus.Infof("sending board watch message: %v", err)
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment