Commit f65eff25 authored by Cristian Maglie's avatar Cristian Maglie

Inlining methods in ArduinoCoreServiceImpl (part 4: BoardListWatch)

The BoardListWatch RPC call has been converted into a method of the gRPC
server implementation.

This commit boasts an helper method to convert a gRPC streaming response
into a channel.
parent 0df21493
// This file is part of arduino-cli.
//
// Copyright 2024 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to
// modify or otherwise use the software for commercial activities involving the
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to license@arduino.cc.
package commands
import (
"context"
"errors"
"sync"
"google.golang.org/grpc/metadata"
)
type streamingResponseProxyToChan[T any] struct {
ctx context.Context
respChan chan<- *T
respLock sync.Mutex
}
func streamResponseToChan[T any](ctx context.Context) (*streamingResponseProxyToChan[T], <-chan *T) {
respChan := make(chan *T, 1)
w := &streamingResponseProxyToChan[T]{
ctx: ctx,
respChan: respChan,
}
go func() {
<-ctx.Done()
w.respLock.Lock()
close(w.respChan)
w.respChan = nil
w.respLock.Unlock()
}()
return w, respChan
}
func (w *streamingResponseProxyToChan[T]) Send(resp *T) error {
w.respLock.Lock()
if w.respChan != nil {
w.respChan <- resp
}
w.respLock.Unlock()
return nil
}
func (w *streamingResponseProxyToChan[T]) Context() context.Context {
return w.ctx
}
func (w *streamingResponseProxyToChan[T]) RecvMsg(m any) error {
return errors.New("RecvMsg not implemented")
}
func (w *streamingResponseProxyToChan[T]) SendHeader(metadata.MD) error {
return errors.New("SendHeader not implemented")
}
func (w *streamingResponseProxyToChan[T]) SendMsg(m any) error {
return errors.New("SendMsg not implemented")
}
func (w *streamingResponseProxyToChan[T]) SetHeader(metadata.MD) error {
return errors.New("SetHeader not implemented")
}
func (w *streamingResponseProxyToChan[T]) SetTrailer(tr metadata.MD) {
}
...@@ -18,7 +18,6 @@ package commands ...@@ -18,7 +18,6 @@ package commands
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io" "io"
"sync/atomic" "sync/atomic"
...@@ -44,37 +43,6 @@ type arduinoCoreServerImpl struct { ...@@ -44,37 +43,6 @@ type arduinoCoreServerImpl struct {
versionString string versionString string
} }
// BoardSearch exposes to the gRPC interface the board search command
func (s *arduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardSearchRequest) (*rpc.BoardSearchResponse, error) {
return BoardSearch(ctx, req)
}
// BoardListWatch FIXMEDOC
func (s *arduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
syncSend := NewSynchronizedSend(stream.Send)
if req.GetInstance() == nil {
err := fmt.Errorf(tr("no instance specified"))
syncSend.Send(&rpc.BoardListWatchResponse{
EventType: "error",
Error: err.Error(),
})
return err
}
eventsChan, err := BoardListWatch(stream.Context(), req)
if err != nil {
return err
}
for event := range eventsChan {
if err := syncSend.Send(event); err != nil {
logrus.Infof("sending board watch message: %v", err)
}
}
return nil
}
// Destroy FIXMEDOC // Destroy FIXMEDOC
func (s *arduinoCoreServerImpl) Destroy(ctx context.Context, req *rpc.DestroyRequest) (*rpc.DestroyResponse, error) { func (s *arduinoCoreServerImpl) Destroy(ctx context.Context, req *rpc.DestroyRequest) (*rpc.DestroyResponse, error) {
return Destroy(ctx, req) return Destroy(ctx, req)
......
...@@ -262,29 +262,43 @@ func hasMatchingBoard(b *rpc.DetectedPort, fqbnFilter *cores.FQBN) bool { ...@@ -262,29 +262,43 @@ func hasMatchingBoard(b *rpc.DetectedPort, fqbnFilter *cores.FQBN) bool {
return false return false
} }
// BoardListWatch returns a channel that receives boards connection and disconnection events. // BoardListWatchProxyToChan return a stream, to be used in BoardListWatch method,
func BoardListWatch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, error) { // that proxies all the responses to a channel.
func BoardListWatchProxyToChan(ctx context.Context) (rpc.ArduinoCoreService_BoardListWatchServer, <-chan *rpc.BoardListWatchResponse) {
return streamResponseToChan[rpc.BoardListWatchResponse](ctx)
}
// BoardListWatch FIXMEDOC
func (s *arduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
syncSend := NewSynchronizedSend(stream.Send)
if req.GetInstance() == nil {
err := fmt.Errorf(tr("no instance specified"))
syncSend.Send(&rpc.BoardListWatchResponse{
EventType: "error",
Error: err.Error(),
})
return err
}
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance()) pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil { if err != nil {
return nil, err return err
} }
defer release() defer release()
dm := pme.DiscoveryManager() dm := pme.DiscoveryManager()
watcher, err := dm.Watch() watcher, err := dm.Watch()
if err != nil { if err != nil {
return nil, err return err
} }
go func() { go func() {
<-ctx.Done() <-stream.Context().Done()
logrus.Trace("closed watch") logrus.Trace("closed watch")
watcher.Close() watcher.Close()
}() }()
outChan := make(chan *rpc.BoardListWatchResponse)
go func() { go func() {
defer close(outChan)
for event := range watcher.Feed() { for event := range watcher.Feed() {
port := &rpc.DetectedPort{ port := &rpc.DetectedPort{
Port: rpc.DiscoveryPortToRPC(event.Port), Port: rpc.DiscoveryPortToRPC(event.Port),
...@@ -298,13 +312,13 @@ func BoardListWatch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan ...@@ -298,13 +312,13 @@ func BoardListWatch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan
} }
port.MatchingBoards = boards port.MatchingBoards = boards
} }
outChan <- &rpc.BoardListWatchResponse{ stream.Send(&rpc.BoardListWatchResponse{
EventType: event.Type, EventType: event.Type,
Port: port, Port: port,
Error: boardsError, Error: boardsError,
} })
} }
}() }()
return outChan, nil return nil
} }
...@@ -29,7 +29,7 @@ import ( ...@@ -29,7 +29,7 @@ import (
// Boards are searched in all platforms, including those in the index that are not yet // Boards are searched in all platforms, including those in the index that are not yet
// installed. Note that platforms that are not installed don't include boards' FQBNs. // installed. Note that platforms that are not installed don't include boards' FQBNs.
// If no search argument is used all boards are returned. // If no search argument is used all boards are returned.
func BoardSearch(ctx context.Context, req *rpc.BoardSearchRequest) (*rpc.BoardSearchResponse, error) { func (s *arduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardSearchRequest) (*rpc.BoardSearchResponse, error) {
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance()) pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -85,7 +85,7 @@ func CalculateFQBNAndPort(portArgs *Port, fqbnArg *Fqbn, instance *rpc.Instance, ...@@ -85,7 +85,7 @@ func CalculateFQBNAndPort(portArgs *Port, fqbnArg *Fqbn, instance *rpc.Instance,
return fqbn, port return fqbn, port
} }
port, err := portArgs.GetPort(instance, defaultAddress, defaultProtocol) port, err := portArgs.GetPort(instance, srv, defaultAddress, defaultProtocol)
if err != nil { if err != nil {
feedback.Fatal(tr("Error getting port metadata: %v", err), feedback.ErrGeneric) feedback.Fatal(tr("Error getting port metadata: %v", err), feedback.ErrGeneric)
} }
......
...@@ -56,12 +56,12 @@ func (p *Port) AddToCommand(cmd *cobra.Command, srv rpc.ArduinoCoreServiceServer ...@@ -56,12 +56,12 @@ func (p *Port) AddToCommand(cmd *cobra.Command, srv rpc.ArduinoCoreServiceServer
// This method allows will bypass the discoveries if: // This method allows will bypass the discoveries if:
// - a nil instance is passed: in this case the plain port and protocol arguments are returned (even if empty) // - a nil instance is passed: in this case the plain port and protocol arguments are returned (even if empty)
// - a protocol is specified: in this case the discoveries are not needed to autodetect the protocol. // - a protocol is specified: in this case the discoveries are not needed to autodetect the protocol.
func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, defaultAddress, defaultProtocol string) (string, string, error) { func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, srv rpc.ArduinoCoreServiceServer, defaultAddress, defaultProtocol string) (string, string, error) {
if p.protocol != "" || instance == nil { if p.protocol != "" || instance == nil {
return p.address, p.protocol, nil return p.address, p.protocol, nil
} }
port, err := p.GetPort(instance, defaultAddress, defaultProtocol) port, err := p.GetPort(instance, srv, defaultAddress, defaultProtocol)
if err != nil { if err != nil {
return "", "", err return "", "", err
} }
...@@ -70,8 +70,7 @@ func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, defaultAddress, ...@@ -70,8 +70,7 @@ func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, defaultAddress,
// GetPort returns the Port obtained by parsing command line arguments. // GetPort returns the Port obtained by parsing command line arguments.
// The extra metadata for the ports is obtained using the pluggable discoveries. // The extra metadata for the ports is obtained using the pluggable discoveries.
func (p *Port) GetPort(instance *rpc.Instance, defaultAddress, defaultProtocol string) (*rpc.Port, error) { func (p *Port) GetPort(instance *rpc.Instance, srv rpc.ArduinoCoreServiceServer, defaultAddress, defaultProtocol string) (*rpc.Port, error) {
address := p.address address := p.address
protocol := p.protocol protocol := p.protocol
if address == "" && (defaultAddress != "" || defaultProtocol != "") { if address == "" && (defaultAddress != "" || defaultProtocol != "") {
...@@ -91,7 +90,10 @@ func (p *Port) GetPort(instance *rpc.Instance, defaultAddress, defaultProtocol s ...@@ -91,7 +90,10 @@ func (p *Port) GetPort(instance *rpc.Instance, defaultAddress, defaultProtocol s
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
watcher, err := commands.BoardListWatch(ctx, &rpc.BoardListWatchRequest{Instance: instance})
stream, watcher := commands.BoardListWatchProxyToChan(ctx)
err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: instance}, stream)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -45,7 +45,7 @@ func initAttachCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command { ...@@ -45,7 +45,7 @@ func initAttachCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
if len(args) > 0 { if len(args) > 0 {
sketchPath = args[0] sketchPath = args[0]
} }
runAttachCommand(sketchPath, &port, fqbn.String(), &programmer) runAttachCommand(srv, sketchPath, &port, fqbn.String(), &programmer)
}, },
} }
fqbn.AddToCommand(attachCommand, srv) fqbn.AddToCommand(attachCommand, srv)
...@@ -55,10 +55,10 @@ func initAttachCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command { ...@@ -55,10 +55,10 @@ func initAttachCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
return attachCommand return attachCommand
} }
func runAttachCommand(path string, port *arguments.Port, fqbn string, programmer *arguments.Programmer) { func runAttachCommand(srv rpc.ArduinoCoreServiceServer, path string, port *arguments.Port, fqbn string, programmer *arguments.Programmer) {
sketchPath := arguments.InitSketchPath(path) sketchPath := arguments.InitSketchPath(path)
portAddress, portProtocol, _ := port.GetPortAddressAndProtocol(nil, "", "") portAddress, portProtocol, _ := port.GetPortAddressAndProtocol(nil, srv, "", "")
newDefaults, err := commands.SetSketchDefaults(context.Background(), &rpc.SetSketchDefaultsRequest{ newDefaults, err := commands.SetSketchDefaults(context.Background(), &rpc.SetSketchDefaultsRequest{
SketchPath: sketchPath.String(), SketchPath: sketchPath.String(),
DefaultFqbn: fqbn, DefaultFqbn: fqbn,
......
...@@ -39,7 +39,7 @@ func NewCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command { ...@@ -39,7 +39,7 @@ func NewCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
boardCommand.AddCommand(initDetailsCommand(srv)) boardCommand.AddCommand(initDetailsCommand(srv))
boardCommand.AddCommand(initListCommand(srv)) boardCommand.AddCommand(initListCommand(srv))
boardCommand.AddCommand(initListAllCommand(srv)) boardCommand.AddCommand(initListAllCommand(srv))
boardCommand.AddCommand(initSearchCommand()) boardCommand.AddCommand(initSearchCommand(srv))
return boardCommand return boardCommand
} }
...@@ -63,7 +63,7 @@ func runListCommand(srv rpc.ArduinoCoreServiceServer, watch bool, timeout int64, ...@@ -63,7 +63,7 @@ func runListCommand(srv rpc.ArduinoCoreServiceServer, watch bool, timeout int64,
logrus.Info("Executing `arduino-cli board list`") logrus.Info("Executing `arduino-cli board list`")
if watch { if watch {
watchList(inst) watchList(inst, srv)
return return
} }
...@@ -88,8 +88,9 @@ func runListCommand(srv rpc.ArduinoCoreServiceServer, watch bool, timeout int64, ...@@ -88,8 +88,9 @@ func runListCommand(srv rpc.ArduinoCoreServiceServer, watch bool, timeout int64,
feedback.PrintResult(listResult{result.NewDetectedPorts(ports)}) feedback.PrintResult(listResult{result.NewDetectedPorts(ports)})
} }
func watchList(inst *rpc.Instance) { func watchList(inst *rpc.Instance, srv rpc.ArduinoCoreServiceServer) {
eventsChan, err := commands.BoardListWatch(context.Background(), &rpc.BoardListWatchRequest{Instance: inst}) stream, eventsChan := commands.BoardListWatchProxyToChan(context.Background())
err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: inst}, stream)
if err != nil { if err != nil {
feedback.Fatal(tr("Error detecting boards: %v", err), feedback.ErrNetwork) feedback.Fatal(tr("Error detecting boards: %v", err), feedback.ErrNetwork)
} }
......
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"sort" "sort"
"strings" "strings"
"github.com/arduino/arduino-cli/commands"
"github.com/arduino/arduino-cli/internal/cli/feedback" "github.com/arduino/arduino-cli/internal/cli/feedback"
"github.com/arduino/arduino-cli/internal/cli/feedback/result" "github.com/arduino/arduino-cli/internal/cli/feedback/result"
"github.com/arduino/arduino-cli/internal/cli/feedback/table" "github.com/arduino/arduino-cli/internal/cli/feedback/table"
...@@ -32,7 +31,7 @@ import ( ...@@ -32,7 +31,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
func initSearchCommand() *cobra.Command { func initSearchCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
var searchCommand = &cobra.Command{ var searchCommand = &cobra.Command{
Use: fmt.Sprintf("search [%s]", tr("boardname")), Use: fmt.Sprintf("search [%s]", tr("boardname")),
Short: tr("Search for a board in the Boards Manager."), Short: tr("Search for a board in the Boards Manager."),
...@@ -41,18 +40,20 @@ func initSearchCommand() *cobra.Command { ...@@ -41,18 +40,20 @@ func initSearchCommand() *cobra.Command {
" " + os.Args[0] + " board search\n" + " " + os.Args[0] + " board search\n" +
" " + os.Args[0] + " board search zero", " " + os.Args[0] + " board search zero",
Args: cobra.ArbitraryArgs, Args: cobra.ArbitraryArgs,
Run: runSearchCommand, Run: func(cmd *cobra.Command, args []string) {
runSearchCommand(srv, args)
},
} }
searchCommand.Flags().BoolVarP(&showHiddenBoard, "show-hidden", "a", false, tr("Show also boards marked as 'hidden' in the platform")) searchCommand.Flags().BoolVarP(&showHiddenBoard, "show-hidden", "a", false, tr("Show also boards marked as 'hidden' in the platform"))
return searchCommand return searchCommand
} }
func runSearchCommand(cmd *cobra.Command, args []string) { func runSearchCommand(srv rpc.ArduinoCoreServiceServer, args []string) {
inst := instance.CreateAndInit() inst := instance.CreateAndInit()
logrus.Info("Executing `arduino-cli board search`") logrus.Info("Executing `arduino-cli board search`")
res, err := commands.BoardSearch(context.Background(), &rpc.BoardSearchRequest{ res, err := srv.BoardSearch(context.Background(), &rpc.BoardSearchRequest{
Instance: inst, Instance: inst,
SearchArgs: strings.Join(args, " "), SearchArgs: strings.Join(args, " "),
IncludeHiddenBoards: showHiddenBoard, IncludeHiddenBoards: showHiddenBoard,
......
...@@ -71,7 +71,7 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) { ...@@ -71,7 +71,7 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) {
logrus.Info("Executing `arduino-cli burn-bootloader`") logrus.Info("Executing `arduino-cli burn-bootloader`")
// We don't need a Sketch to upload a board's bootloader // We don't need a Sketch to upload a board's bootloader
discoveryPort, err := port.GetPort(instance, "", "") discoveryPort, err := port.GetPort(instance, srv, "", "")
if err != nil { if err != nil {
feedback.Fatal(tr("Error during Upload: %v", err), feedback.ErrGeneric) feedback.Fatal(tr("Error during Upload: %v", err), feedback.ErrGeneric)
} }
......
...@@ -55,7 +55,7 @@ func runDebugCheckCommand(srv rpc.ArduinoCoreServiceServer, portArgs *arguments. ...@@ -55,7 +55,7 @@ func runDebugCheckCommand(srv rpc.ArduinoCoreServiceServer, portArgs *arguments.
instance := instance.CreateAndInit() instance := instance.CreateAndInit()
logrus.Info("Executing `arduino-cli debug`") logrus.Info("Executing `arduino-cli debug`")
port, err := portArgs.GetPort(instance, "", "") port, err := portArgs.GetPort(instance, srv, "", "")
if err != nil { if err != nil {
feedback.FatalError(err, feedback.ErrBadArgument) feedback.FatalError(err, feedback.ErrBadArgument)
} }
......
...@@ -140,7 +140,7 @@ func runMonitorCmd( ...@@ -140,7 +140,7 @@ func runMonitorCmd(
fqbn, _ = portArgs.DetectFQBN(inst, srv) fqbn, _ = portArgs.DetectFQBN(inst, srv)
} }
portAddress, portProtocol, err := portArgs.GetPortAddressAndProtocol(inst, defaultPort, defaultProtocol) portAddress, portProtocol, err := portArgs.GetPortAddressAndProtocol(inst, srv, defaultPort, defaultProtocol)
if err != nil { if err != nil {
feedback.FatalError(err, feedback.ErrGeneric) feedback.FatalError(err, feedback.ErrGeneric)
} }
......
...@@ -63,15 +63,15 @@ func TestArduinoCliDaemon(t *testing.T) { ...@@ -63,15 +63,15 @@ func TestArduinoCliDaemon(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
watcherCanceldCh := make(chan struct{}) watcherCanceldCh := make(chan struct{})
go func() { go func() {
defer close(watcherCanceldCh)
for { for {
msg, err := watcher.Recv() msg, err := watcher.Recv()
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
fmt.Println("Watcher EOF") fmt.Println("Got EOF from watcher")
return return
} }
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
fmt.Println("Watcher canceled") fmt.Println("Got Canceled error from watcher")
watcherCanceldCh <- struct{}{}
return return
} }
require.NoError(t, err, "BoardListWatch grpc call returned an error") require.NoError(t, err, "BoardListWatch grpc call returned an error")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment