Unverified Commit 2931737f authored by Cristian Maglie's avatar Cristian Maglie Committed by GitHub

[skip-changelog] Added `discovery.Discovery` object to handle communication...

[skip-changelog] Added `discovery.Discovery` object to handle communication with pluggable discoveries (#1029)

* Pluggable Discovery handler: first implementation

* Added 'discovery_client' (for debugging discoveries)

* Added 'event' mode in PluggableDiscovery

* discovery_client now supports multiple discoveries

* Added ID to PluggableDiscovery

* Added PluggableDiscovery.String() implementation

* Fixed TestDiscoveryStdioHandling

* Fixed discovery test run on Windows

It really takes that long for messages to go back and forth.
I don't know if there is a simpler way to reduce stdio buffering time.
parent 18c4c407
// This file is part of arduino-cli.
//
// Copyright 2020 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to
// modify or otherwise use the software for commercial activities involving the
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to license@arduino.cc.
package discovery
import (
"encoding/json"
"io"
"sync"
"time"
"github.com/arduino/arduino-cli/executils"
"github.com/arduino/go-properties-orderedmap"
"github.com/pkg/errors"
)
// PluggableDiscovery is a tool that detects communication ports to interact
// with the boards.
type PluggableDiscovery struct {
id string
args []string
process *executils.Process
outgoingCommandsPipe io.Writer
incomingMessagesChan <-chan *discoveryMessage
// All the following fields are guarded by statusMutex
statusMutex sync.Mutex
incomingMessagesError error
alive bool
eventsMode bool
eventChan chan<- *Event
cachedPorts map[string]*Port
}
type discoveryMessage struct {
EventType string `json:"eventType"`
Message string `json:"message"`
Ports []*Port `json:"ports"`
Port *Port `json:"port"`
}
// Port containts metadata about a port to connect to a board.
type Port struct {
Address string `json:"address"`
AddressLabel string `json:"label"`
Protocol string `json:"protocol"`
ProtocolLabel string `json:"protocolLabel"`
Properties *properties.Map `json:"prefs"`
IdentificationProperties *properties.Map `json:"identificationPrefs"`
}
func (p *Port) String() string {
if p == nil {
return "none"
}
return p.Address
}
// Event is a pluggable discovery event
type Event struct {
Type string
Port *Port
}
// New create and connect to the given pluggable discovery
func New(id string, args ...string) (*PluggableDiscovery, error) {
proc, err := executils.NewProcess(args...)
if err != nil {
return nil, err
}
stdout, err := proc.StdoutPipe()
if err != nil {
return nil, err
}
stdin, err := proc.StdinPipe()
if err != nil {
return nil, err
}
if err := proc.Start(); err != nil {
return nil, err
}
messageChan := make(chan *discoveryMessage)
disc := &PluggableDiscovery{
id: id,
process: proc,
incomingMessagesChan: messageChan,
outgoingCommandsPipe: stdin,
alive: true,
}
go disc.jsonDecodeLoop(stdout, messageChan)
return disc, nil
}
// GetID returns the identifier for this discovery
func (disc *PluggableDiscovery) GetID() string {
return disc.id
}
func (disc *PluggableDiscovery) String() string {
return disc.id
}
func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *discoveryMessage) {
decoder := json.NewDecoder(in)
closeAndReportError := func(err error) {
disc.statusMutex.Lock()
disc.alive = false
disc.incomingMessagesError = err
disc.statusMutex.Unlock()
close(outChan)
}
for {
var msg discoveryMessage
if err := decoder.Decode(&msg); err != nil {
closeAndReportError(err)
return
}
if msg.EventType == "add" {
if msg.Port == nil {
closeAndReportError(errors.New("invalid 'add' message: missing port"))
return
}
disc.statusMutex.Lock()
disc.cachedPorts[msg.Port.Address] = msg.Port
if disc.eventChan != nil {
disc.eventChan <- &Event{"add", msg.Port}
}
disc.statusMutex.Unlock()
} else if msg.EventType == "remove" {
if msg.Port == nil {
closeAndReportError(errors.New("invalid 'remove' message: missing port"))
return
}
disc.statusMutex.Lock()
delete(disc.cachedPorts, msg.Port.Address)
if disc.eventChan != nil {
disc.eventChan <- &Event{"remove", msg.Port}
}
disc.statusMutex.Unlock()
} else {
outChan <- &msg
}
}
}
// IsAlive return true if the discovery process is running and so is able to receive commands
// and produce events.
func (disc *PluggableDiscovery) IsAlive() bool {
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
return disc.alive
}
// IsEventMode return true if the discovery is in "events" mode
func (disc *PluggableDiscovery) IsEventMode() bool {
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
return disc.eventsMode
}
func (disc *PluggableDiscovery) waitMessage(timeout time.Duration) (*discoveryMessage, error) {
select {
case msg := <-disc.incomingMessagesChan:
if msg == nil {
// channel has been closed
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
return nil, disc.incomingMessagesError
}
return msg, nil
case <-time.After(timeout):
return nil, errors.New("timeout")
}
}
func (disc *PluggableDiscovery) sendCommand(command string) error {
if n, err := disc.outgoingCommandsPipe.Write([]byte(command)); err != nil {
return err
} else if n < len(command) {
return disc.sendCommand(command[n:])
} else {
return nil
}
}
// Start initializes and start the discovery internal subroutines. This command must be
// called before List or StartSync.
func (disc *PluggableDiscovery) Start() error {
if err := disc.sendCommand("START\n"); err != nil {
return err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return err
} else if msg.EventType != "start" {
return errors.Errorf("communication out of sync, expected 'start', received '%s'", msg.EventType)
} else if msg.Message != "OK" {
return errors.Errorf("command failed: %s", msg.Message)
}
return nil
}
// Stop stops the discovery internal subroutines and possibly free the internally
// used resources. This command should be called if the client wants to pause the
// discovery for a while.
func (disc *PluggableDiscovery) Stop() error {
if err := disc.sendCommand("STOP\n"); err != nil {
return err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return err
} else if msg.EventType != "stop" {
return errors.Errorf("communication out of sync, expected 'stop', received '%s'", msg.EventType)
} else if msg.Message != "OK" {
return errors.Errorf("command failed: %s", msg.Message)
}
return nil
}
// Quit terminates the discovery. No more commands can be accepted by the discovery.
func (disc *PluggableDiscovery) Quit() error {
if err := disc.sendCommand("QUIT\n"); err != nil {
return err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return err
} else if msg.EventType != "quit" {
return errors.Errorf("communication out of sync, expected 'quit', received '%s'", msg.EventType)
} else if msg.Message != "OK" {
return errors.Errorf("command failed: %s", msg.Message)
}
return nil
}
// List executes an enumeration of the ports and returns a list of the available
// ports at the moment of the call.
func (disc *PluggableDiscovery) List() ([]*Port, error) {
if err := disc.sendCommand("LIST\n"); err != nil {
return nil, err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return nil, err
} else if msg.EventType != "list" {
return nil, errors.Errorf("communication out of sync, expected 'list', received '%s'", msg.EventType)
} else {
return msg.Ports, nil
}
}
// EventChannel creates a channel used to receive events from the pluggable discovery.
// The event channel must be consumed as quickly as possible since it may block the
// discovery if it becomes full. The channel size is configurable.
func (disc *PluggableDiscovery) EventChannel(size int) <-chan *Event {
c := make(chan *Event, size)
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
disc.eventChan = c
return c
}
// StartSync puts the discovery in "events" mode: the discovery will send "add"
// and "remove" events each time a new port is detected or removed respectively.
// After calling StartSync an initial burst of "add" events may be generated to
// report all the ports available at the moment of the start.
func (disc *PluggableDiscovery) StartSync() error {
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
if disc.eventsMode {
return errors.New("already in events mode")
}
if err := disc.sendCommand("START_SYNC\n"); err != nil {
return err
}
// START_SYNC does not give any response
disc.eventsMode = true
disc.cachedPorts = map[string]*Port{}
return nil
}
// ListSync returns a list of the available ports. The list is a cache of all the
// add/remove events happened from the StartSync call and it will not consume any
// resource from the underliying discovery.
func (disc *PluggableDiscovery) ListSync() []*Port {
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
res := []*Port{}
for _, port := range disc.cachedPorts {
res = append(res, port)
}
return res
}
module github.com/arduino/arduino-cli/arduino/discovery/discovery_client
go 1.14
replace github.com/arduino/arduino-cli => ../../..
require (
github.com/arduino/arduino-cli v0.0.0-00010101000000-000000000000
github.com/gizak/termui/v3 v3.1.0
)
This diff is collapsed.
// This file is part of arduino-cli.
//
// Copyright 2020 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to
// modify or otherwise use the software for commercial activities involving the
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to license@arduino.cc.
// discovery_client is a command line UI client to test pluggable discoveries.
package main
import (
"fmt"
"log"
"os"
"sort"
"time"
"github.com/arduino/arduino-cli/arduino/discovery"
ui "github.com/gizak/termui/v3"
"github.com/gizak/termui/v3/widgets"
)
func main() {
discoveries := []*discovery.PluggableDiscovery{}
discEvent := make(chan *discovery.Event)
for _, discCmd := range os.Args[1:] {
disc, err := discovery.New("", discCmd)
if err != nil {
log.Fatal("Error initializing discovery:", err)
}
if err := disc.Start(); err != nil {
log.Fatal("Error starting discovery:", err)
}
if err := disc.StartSync(); err != nil {
log.Fatal("Error starting discovery:", err)
}
go func() {
for msg := range disc.EventChannel(10) {
discEvent <- msg
}
}()
discoveries = append(discoveries, disc)
}
if err := ui.Init(); err != nil {
log.Fatalf("failed to initialize termui: %v", err)
}
defer ui.Close()
l := widgets.NewList()
l.Title = "List"
l.TextStyle = ui.NewStyle(ui.ColorYellow)
l.WrapText = false
w, h := ui.TerminalDimensions()
l.SetRect(0, 0, w, h)
updateList := func() {
rows := []string{}
rows = append(rows, "Available ports list:")
for _, disc := range discoveries {
for i, port := range disc.ListSync() {
rows = append(rows, fmt.Sprintf(" [%04d] Address: %s", i, port.AddressLabel))
rows = append(rows, fmt.Sprintf(" Protocol: %s", port.ProtocolLabel))
keys := port.Properties.Keys()
sort.Strings(keys)
for _, k := range keys {
rows = append(rows, fmt.Sprintf(" %s=%s", k, port.Properties.Get(k)))
}
}
}
l.Rows = rows
}
updateList()
ui.Render(l)
previousKey := ""
uiEvents := ui.PollEvents()
out:
for {
select {
case e := <-uiEvents:
switch e.ID {
case "<Resize>":
payload := e.Payload.(ui.Resize)
l.SetRect(0, 0, payload.Width, payload.Height)
ui.Clear()
case "q", "<C-c>":
break out
case "j", "<Down>":
l.ScrollDown()
case "k", "<Up>":
l.ScrollUp()
case "<C-d>":
l.ScrollHalfPageDown()
case "<C-u>":
l.ScrollHalfPageUp()
case "<C-f>":
l.ScrollPageDown()
case "<C-b>":
l.ScrollPageUp()
case "g":
if previousKey == "g" {
l.ScrollTop()
}
case "<Home>":
l.ScrollTop()
case "G", "<End>":
l.ScrollBottom()
}
if previousKey == "g" {
previousKey = ""
} else {
previousKey = e.ID
}
case <-discEvent:
updateList()
}
ui.Render(l)
}
for _, disc := range discoveries {
if err := disc.Quit(); err != nil {
log.Fatal("Error stopping discovery:", err)
}
fmt.Println("Discovery QUITed")
for disc.IsAlive() {
time.Sleep(time.Millisecond)
}
fmt.Println("Discovery correctly terminated")
}
}
// This file is part of arduino-cli.
//
// Copyright 2020 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to
// modify or otherwise use the software for commercial activities involving the
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to license@arduino.cc.
package discovery
import (
"io"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestDiscoveryStdioHandling(t *testing.T) {
disc, err := New("test", "go", "run", "testdata/cat/main.go") // copy stdin to stdout
require.NoError(t, err)
_, err = disc.outgoingCommandsPipe.Write([]byte(`{ "eventType":`)) // send partial JSON
require.NoError(t, err)
msg, err := disc.waitMessage(time.Millisecond * 1000)
require.Error(t, err)
require.Nil(t, msg)
_, err = disc.outgoingCommandsPipe.Write([]byte(`"ev1" }{ `)) // complete previous json and start another one
require.NoError(t, err)
msg, err = disc.waitMessage(time.Millisecond * 1000)
require.NoError(t, err)
require.NotNil(t, msg)
require.Equal(t, "ev1", msg.EventType)
msg, err = disc.waitMessage(time.Millisecond * 1000)
require.Error(t, err)
require.Nil(t, msg)
_, err = disc.outgoingCommandsPipe.Write([]byte(`"eventType":"ev2" }`)) // complete previous json
require.NoError(t, err)
msg, err = disc.waitMessage(time.Millisecond * 1000)
require.NoError(t, err)
require.NotNil(t, msg)
require.Equal(t, "ev2", msg.EventType)
require.True(t, disc.IsAlive())
err = disc.outgoingCommandsPipe.(io.ReadCloser).Close()
require.NoError(t, err)
time.Sleep(time.Millisecond * 200)
require.False(t, disc.IsAlive())
}
// Echo stdin to stdout.
// This program is used for testing purposes, to make it available on all
// OS a tool equivalent to UNIX "cat".
package main
import (
"io"
"os"
)
func main() {
io.Copy(os.Stdout, os.Stdin)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment