Commit bb815cfe authored by Cristian Maglie's avatar Cristian Maglie

Inlining methods in ArduinoCoreServiceImpl (part 8: Monitor)

This change is quite challenging because it implements a bidirectional
streaming service. The gRPC implementation is slightly simpler, BTW the
command-line requires a bit of streams fiddling to get the same
behaviour as before because now:

 * the Monitor call do not return anymore a clean io.ReadWriteCloser.
 * the call to srv.Monitor is blocking until the port is closed or the
   context is canceled.
parent 917dcc5d
......@@ -17,15 +17,10 @@ package commands
import (
"context"
"errors"
"io"
"sync/atomic"
"github.com/arduino/arduino-cli/commands/cache"
"github.com/arduino/arduino-cli/commands/cmderrors"
"github.com/arduino/arduino-cli/commands/updatecheck"
rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/sirupsen/logrus"
)
// NewArduinoCoreServer returns an implementation of the ArduinoCoreService gRPC service
......@@ -356,102 +351,6 @@ func (s *arduinoCoreServerImpl) EnumerateMonitorPortSettings(ctx context.Context
return EnumerateMonitorPortSettings(ctx, req)
}
// Monitor FIXMEDOC
func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorServer) error {
syncSend := NewSynchronizedSend(stream.Send)
// The configuration must be sent on the first message
req, err := stream.Recv()
if err != nil {
return err
}
openReq := req.GetOpenRequest()
if openReq == nil {
return &cmderrors.InvalidInstanceError{}
}
portProxy, _, err := Monitor(stream.Context(), openReq)
if err != nil {
return err
}
// Send a message with Success set to true to notify the caller of the port being now active
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})
cancelCtx, cancel := context.WithCancel(stream.Context())
gracefulCloseInitiated := &atomic.Bool{}
gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background())
// gRPC stream receiver (gRPC data -> monitor, config, close)
go func() {
defer cancel()
for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
if conf := msg.GetUpdatedConfiguration(); conf != nil {
for _, c := range conf.GetSettings() {
if err := portProxy.Config(c.GetSettingId(), c.GetValue()); err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
}
}
}
if closeMsg := msg.GetClose(); closeMsg {
gracefulCloseInitiated.Store(true)
if err := portProxy.Close(); err != nil {
logrus.WithError(err).Debug("Error closing monitor port")
}
gracefulCloseCancel()
}
tx := msg.GetTxData()
for len(tx) > 0 {
n, err := portProxy.Write(tx)
if errors.Is(err, io.EOF) {
return
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
tx = tx[n:]
}
}
}()
// gRPC stream sender (monitor -> gRPC)
go func() {
defer cancel() // unlock the receiver
buff := make([]byte, 4096)
for {
n, err := portProxy.Read(buff)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
break
}
if err := syncSend.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
break
}
}
}()
<-cancelCtx.Done()
if gracefulCloseInitiated.Load() {
// Port closing has been initiated in the receiver
<-gracefuleCloseCtx.Done()
} else {
portProxy.Close()
}
return nil
}
// CheckForArduinoCLIUpdates FIXMEDOC
func (s *arduinoCoreServerImpl) CheckForArduinoCLIUpdates(ctx context.Context, req *rpc.CheckForArduinoCLIUpdatesRequest) (*rpc.CheckForArduinoCLIUpdatesResponse, error) {
return updatecheck.CheckForArduinoCLIUpdates(ctx, req)
......
......@@ -17,8 +17,10 @@ package commands
import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"
"github.com/arduino/arduino-cli/commands/cmderrors"
"github.com/arduino/arduino-cli/commands/internal/instances"
......@@ -27,87 +29,211 @@ import (
pluggableMonitor "github.com/arduino/arduino-cli/internal/arduino/monitor"
rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/arduino/go-properties-orderedmap"
"github.com/djherbis/buffer"
"github.com/djherbis/nio/v3"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
)
// portProxy is an io.ReadWriteCloser that maps into the monitor port of the board
type portProxy struct {
rw io.ReadWriter
changeSettingsCB func(setting, value string) error
closeCB func() error
type monitorPipeServer struct {
ctx context.Context
req atomic.Pointer[rpc.MonitorPortOpenRequest]
in *nio.PipeReader
out *nio.PipeWriter
}
func (p *portProxy) Read(buff []byte) (int, error) {
return p.rw.Read(buff)
func (s *monitorPipeServer) Send(resp *rpc.MonitorResponse) error {
if len(resp.GetRxData()) > 0 {
if _, err := s.out.Write(resp.GetRxData()); err != nil {
return err
}
}
return nil
}
func (p *portProxy) Write(buff []byte) (int, error) {
return p.rw.Write(buff)
func (s *monitorPipeServer) Recv() (r *rpc.MonitorRequest, e error) {
if conf := s.req.Swap(nil); conf != nil {
return &rpc.MonitorRequest{Message: &rpc.MonitorRequest_OpenRequest{OpenRequest: conf}}, nil
}
buff := make([]byte, 4096)
n, err := s.in.Read(buff)
if err != nil {
return nil, err
}
return &rpc.MonitorRequest{Message: &rpc.MonitorRequest_TxData{TxData: buff[:n]}}, nil
}
// Config sets the port configuration setting to the specified value
func (p *portProxy) Config(setting, value string) error {
return p.changeSettingsCB(setting, value)
func (s *monitorPipeServer) Context() context.Context {
return s.ctx
}
// Close the port
func (p *portProxy) Close() error {
return p.closeCB()
func (s *monitorPipeServer) RecvMsg(m any) error { return nil }
func (s *monitorPipeServer) SendHeader(metadata.MD) error { return nil }
func (s *monitorPipeServer) SendMsg(m any) error { return nil }
func (s *monitorPipeServer) SetHeader(metadata.MD) error { return nil }
func (s *monitorPipeServer) SetTrailer(metadata.MD) {}
type monitorPipeClient struct {
in *nio.PipeReader
out *nio.PipeWriter
close func()
}
// Monitor opens a communication port. It returns a PortProxy to communicate with the port and a PortDescriptor
// that describes the available configuration settings.
func Monitor(ctx context.Context, req *rpc.MonitorPortOpenRequest) (*portProxy, *pluggableMonitor.PortDescriptor, error) {
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil {
return nil, nil, err
}
defer release()
func (s *monitorPipeClient) Read(buff []byte) (n int, err error) {
return s.in.Read(buff)
}
m, boardSettings, err := findMonitorAndSettingsForProtocolAndBoard(pme, req.GetPort().GetProtocol(), req.GetFqbn())
func (s *monitorPipeClient) Write(buff []byte) (n int, err error) {
return s.out.Write(buff)
}
func (s *monitorPipeClient) Close() error {
s.in.Close()
s.out.Close()
s.close()
return nil
}
// MonitorServerToReadWriteCloser creates a monitor server that proxies the data to a ReadWriteCloser.
// The server is returned along with the ReadWriteCloser that can be used to send and receive data
// to the server. The MonitorPortOpenRequest is used to configure the monitor.
func MonitorServerToReadWriteCloser(ctx context.Context, req *rpc.MonitorPortOpenRequest) (rpc.ArduinoCoreService_MonitorServer, io.ReadWriteCloser) {
server := &monitorPipeServer{}
client := &monitorPipeClient{}
server.req.Store(req)
server.ctx, client.close = context.WithCancel(ctx)
client.in, server.out = nio.Pipe(buffer.New(32 * 1024))
server.in, client.out = nio.Pipe(buffer.New(32 * 1024))
return server, client
}
// Monitor opens a port monitor and streams data back and forth until the request is kept alive.
func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorServer) error {
// The configuration must be sent on the first message
req, err := stream.Recv()
if err != nil {
return nil, nil, err
return err
}
if err := m.Run(); err != nil {
return nil, nil, &cmderrors.FailedMonitorError{Cause: err}
openReq := req.GetOpenRequest()
if openReq == nil {
return &cmderrors.InvalidInstanceError{}
}
descriptor, err := m.Describe()
pme, release, err := instances.GetPackageManagerExplorer(openReq.GetInstance())
if err != nil {
m.Quit()
return nil, nil, &cmderrors.FailedMonitorError{Cause: err}
return err
}
// Apply user-requested settings
if portConfig := req.GetPortConfiguration(); portConfig != nil {
defer release()
monitor, boardSettings, err := findMonitorAndSettingsForProtocolAndBoard(pme, openReq.GetPort().GetProtocol(), openReq.GetFqbn())
if err != nil {
return err
}
if err := monitor.Run(); err != nil {
return &cmderrors.FailedMonitorError{Cause: err}
}
if _, err := monitor.Describe(); err != nil {
monitor.Quit()
return &cmderrors.FailedMonitorError{Cause: err}
}
if portConfig := openReq.GetPortConfiguration(); portConfig != nil {
for _, setting := range portConfig.GetSettings() {
boardSettings.Remove(setting.GetSettingId()) // Remove board settings overridden by the user
if err := m.Configure(setting.GetSettingId(), setting.GetValue()); err != nil {
boardSettings.Remove(setting.GetSettingId())
if err := monitor.Configure(setting.GetSettingId(), setting.GetValue()); err != nil {
logrus.Errorf("Could not set configuration %s=%s: %s", setting.GetSettingId(), setting.GetValue(), err)
}
}
}
// Apply specific board settings
for setting, value := range boardSettings.AsMap() {
m.Configure(setting, value)
monitor.Configure(setting, value)
}
monitorIO, err := monitor.Open(openReq.GetPort().GetAddress(), openReq.GetPort().GetProtocol())
if err != nil {
monitor.Quit()
return &cmderrors.FailedMonitorError{Cause: err}
}
logrus.Infof("Port %s successfully opened", openReq.GetPort().GetAddress())
monitorClose := func() error {
monitor.Close()
return monitor.Quit()
}
monIO, err := m.Open(req.GetPort().GetAddress(), req.GetPort().GetProtocol())
// Send a message with Success set to true to notify the caller of the port being now active
syncSend := NewSynchronizedSend(stream.Send)
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})
ctx, cancel := context.WithCancel(stream.Context())
gracefulCloseInitiated := &atomic.Bool{}
gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background())
// gRPC stream receiver (gRPC data -> monitor, config, close)
go func() {
defer cancel()
for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
m.Quit()
return nil, nil, &cmderrors.FailedMonitorError{Cause: err}
}
logrus.Infof("Port %s successfully opened", req.GetPort().GetAddress())
return &portProxy{
rw: monIO,
changeSettingsCB: m.Configure,
closeCB: func() error {
m.Close()
return m.Quit()
},
}, descriptor, nil
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
if conf := msg.GetUpdatedConfiguration(); conf != nil {
for _, c := range conf.GetSettings() {
if err := monitor.Configure(c.GetSettingId(), c.GetValue()); err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
}
}
}
if closeMsg := msg.GetClose(); closeMsg {
gracefulCloseInitiated.Store(true)
if err := monitorClose(); err != nil {
logrus.WithError(err).Debug("Error closing monitor port")
}
gracefulCloseCancel()
}
tx := msg.GetTxData()
for len(tx) > 0 {
n, err := monitorIO.Write(tx)
if errors.Is(err, io.EOF) {
return
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
tx = tx[n:]
}
}
}()
// gRPC stream sender (monitor -> gRPC)
go func() {
defer cancel() // unlock the receiver
buff := make([]byte, 4096)
for {
n, err := monitorIO.Read(buff)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
break
}
if err := syncSend.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
break
}
}
}()
<-ctx.Done()
if gracefulCloseInitiated.Load() {
// Port closing has been initiated in the receiver
<-gracefuleCloseCtx.Done()
} else {
monitorClose()
}
return nil
}
func findMonitorAndSettingsForProtocolAndBoard(pme *packagemanager.Explorer, protocol, fqbn string) (*pluggableMonitor.PluggableMonitor, *properties.Map, error) {
......
......@@ -204,20 +204,6 @@ func runMonitorCmd(
}
}
}
portProxy, _, err := commands.Monitor(context.Background(), &rpc.MonitorPortOpenRequest{
Instance: inst,
Port: &rpc.Port{Address: portAddress, Protocol: portProtocol},
Fqbn: fqbn,
PortConfiguration: configuration,
})
if err != nil {
feedback.FatalError(err, feedback.ErrGeneric)
}
defer portProxy.Close()
if !quiet {
feedback.Print(tr("Connected to %s! Press CTRL-C to exit.", portAddress))
}
ttyIn, ttyOut, err := feedback.InteractiveStreams()
if err != nil {
......@@ -228,7 +214,7 @@ func runMonitorCmd(
ttyOut = newTimeStampWriter(ttyOut)
}
ctx, cancel := cleanup.InterruptableContext(context.Background())
ctx, cancel := cleanup.InterruptableContext(ctx)
if raw {
if feedback.IsInteractive() {
if err := feedback.SetRawModeStdin(); err != nil {
......@@ -246,6 +232,22 @@ func runMonitorCmd(
ttyIn = io.TeeReader(ttyIn, ctrlCDetector)
}
monitorServer, portProxy := commands.MonitorServerToReadWriteCloser(ctx, &rpc.MonitorPortOpenRequest{
Instance: inst,
Port: &rpc.Port{Address: portAddress, Protocol: portProtocol},
Fqbn: fqbn,
PortConfiguration: configuration,
})
go func() {
if !quiet {
feedback.Print(tr("Connecting to %s. Press CTRL-C to exit.", portAddress))
}
if err := srv.Monitor(monitorServer); err != nil {
feedback.FatalError(err, feedback.ErrGeneric)
}
portProxy.Close()
cancel()
}()
go func() {
_, err := io.Copy(ttyOut, portProxy)
if err != nil && !errors.Is(err, io.EOF) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment