Commit 0442319a authored by Cristian Maglie's avatar Cristian Maglie

Inlining methods in ArduinoCoreServiceImpl (part 14: Upload, UploadUsingProgrammer, BurnBootloader)

parent 738ad7c8
...@@ -42,89 +42,11 @@ func (s *arduinoCoreServerImpl) Version(ctx context.Context, req *rpc.VersionReq ...@@ -42,89 +42,11 @@ func (s *arduinoCoreServerImpl) Version(ctx context.Context, req *rpc.VersionReq
return &rpc.VersionResponse{Version: s.versionString}, nil return &rpc.VersionResponse{Version: s.versionString}, nil
} }
// Upload FIXMEDOC
func (s *arduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
syncSend := NewSynchronizedSend(stream.Send)
outStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.UploadResponse{
Message: &rpc.UploadResponse_OutStream{OutStream: data},
})
})
errStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.UploadResponse{
Message: &rpc.UploadResponse_ErrStream{ErrStream: data},
})
})
res, err := Upload(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
if res != nil {
syncSend.Send(&rpc.UploadResponse{
Message: &rpc.UploadResponse_Result{
Result: res,
},
})
}
return err
}
// UploadUsingProgrammer FIXMEDOC
func (s *arduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
syncSend := NewSynchronizedSend(stream.Send)
outStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.UploadUsingProgrammerResponse{
Message: &rpc.UploadUsingProgrammerResponse_OutStream{
OutStream: data,
},
})
})
errStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.UploadUsingProgrammerResponse{
Message: &rpc.UploadUsingProgrammerResponse_ErrStream{
ErrStream: data,
},
})
})
err := UploadUsingProgrammer(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
if err != nil {
return err
}
return nil
}
// SupportedUserFields FIXMEDOC // SupportedUserFields FIXMEDOC
func (s *arduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rpc.SupportedUserFieldsRequest) (*rpc.SupportedUserFieldsResponse, error) { func (s *arduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rpc.SupportedUserFieldsRequest) (*rpc.SupportedUserFieldsResponse, error) {
return SupportedUserFields(ctx, req) return SupportedUserFields(ctx, req)
} }
// BurnBootloader FIXMEDOC
func (s *arduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
syncSend := NewSynchronizedSend(stream.Send)
outStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.BurnBootloaderResponse{
Message: &rpc.BurnBootloaderResponse_OutStream{
OutStream: data,
},
})
})
errStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.BurnBootloaderResponse{
Message: &rpc.BurnBootloaderResponse_ErrStream{
ErrStream: data,
},
})
})
resp, err := BurnBootloader(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
if err != nil {
return err
}
return syncSend.Send(resp)
}
// ListProgrammersAvailableForUpload FIXMEDOC // ListProgrammersAvailableForUpload FIXMEDOC
func (s *arduinoCoreServerImpl) ListProgrammersAvailableForUpload(ctx context.Context, req *rpc.ListProgrammersAvailableForUploadRequest) (*rpc.ListProgrammersAvailableForUploadResponse, error) { func (s *arduinoCoreServerImpl) ListProgrammersAvailableForUpload(ctx context.Context, req *rpc.ListProgrammersAvailableForUploadRequest) (*rpc.ListProgrammersAvailableForUploadResponse, error) {
return ListProgrammersAvailableForUpload(ctx, req) return ListProgrammersAvailableForUpload(ctx, req)
......
...@@ -120,8 +120,33 @@ func getUserFields(toolID string, platformRelease *cores.PlatformRelease) []*rpc ...@@ -120,8 +120,33 @@ func getUserFields(toolID string, platformRelease *cores.PlatformRelease) []*rpc
return userFields return userFields
} }
// Upload FIXMEDOC // UploadToServerStreams return a server stream that forwards the output and error streams to the provided writers.
func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, errStream io.Writer) (*rpc.UploadResult, error) { // It also returns a function that can be used to retrieve the result of the upload.
func UploadToServerStreams(ctx context.Context, outStream io.Writer, errStream io.Writer) (rpc.ArduinoCoreService_UploadServer, func() *rpc.UploadResult) {
var result *rpc.UploadResult
stream := streamResponseToCallback(ctx, func(resp *rpc.UploadResponse) error {
if errData := resp.GetErrStream(); len(errData) > 0 {
_, err := errStream.Write(errData)
return err
}
if outData := resp.GetOutStream(); len(outData) > 0 {
_, err := outStream.Write(outData)
return err
}
if res := resp.GetResult(); res != nil {
result = res
}
return nil
})
return stream, func() *rpc.UploadResult {
return result
}
}
// Upload performs the upload of a sketch to a board.
func (s *arduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
syncSend := NewSynchronizedSend(stream.Send)
logrus.Tracef("Upload %s on %s started", req.GetSketchPath(), req.GetFqbn()) logrus.Tracef("Upload %s on %s started", req.GetSketchPath(), req.GetFqbn())
// TODO: make a generic function to extract sketch from request // TODO: make a generic function to extract sketch from request
...@@ -129,12 +154,12 @@ func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, er ...@@ -129,12 +154,12 @@ func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, er
sketchPath := paths.New(req.GetSketchPath()) sketchPath := paths.New(req.GetSketchPath())
sk, err := sketch.New(sketchPath) sk, err := sketch.New(sketchPath)
if err != nil && req.GetImportDir() == "" && req.GetImportFile() == "" { if err != nil && req.GetImportDir() == "" && req.GetImportFile() == "" {
return nil, &cmderrors.CantOpenSketchError{Cause: err} return &cmderrors.CantOpenSketchError{Cause: err}
} }
pme, pmeRelease, err := instances.GetPackageManagerExplorer(req.GetInstance()) pme, pmeRelease, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil { if err != nil {
return nil, err return err
} }
defer pmeRelease() defer pmeRelease()
...@@ -151,6 +176,20 @@ func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, er ...@@ -151,6 +176,20 @@ func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, er
programmer = sk.GetDefaultProgrammer() programmer = sk.GetDefaultProgrammer()
} }
outStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.UploadResponse{
Message: &rpc.UploadResponse_OutStream{OutStream: data},
})
})
defer outStream.Close()
errStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.UploadResponse{
Message: &rpc.UploadResponse_ErrStream{ErrStream: data},
})
})
defer errStream.Close()
// TODO: inject context
// ctx := stream.Context()
updatedPort, err := runProgramAction( updatedPort, err := runProgramAction(
pme, pme,
sk, sk,
...@@ -168,22 +207,45 @@ func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, er ...@@ -168,22 +207,45 @@ func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, er
req.GetUserFields(), req.GetUserFields(),
) )
if err != nil { if err != nil {
return nil, err return err
} }
return syncSend.Send(&rpc.UploadResponse{
return &rpc.UploadResult{ Message: &rpc.UploadResponse_Result{
Result: &rpc.UploadResult{
UpdatedUploadPort: updatedPort, UpdatedUploadPort: updatedPort,
}, nil },
},
})
} }
// UploadUsingProgrammer FIXMEDOC // UploadUsingProgrammer FIXMEDOC
func UploadUsingProgrammer(ctx context.Context, req *rpc.UploadUsingProgrammerRequest, outStream io.Writer, errStream io.Writer) error { func (s *arduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
syncSend := NewSynchronizedSend(stream.Send)
streamAdapter := streamResponseToCallback(stream.Context(), func(resp *rpc.UploadResponse) error {
if errData := resp.GetErrStream(); len(errData) > 0 {
syncSend.Send(&rpc.UploadUsingProgrammerResponse{
Message: &rpc.UploadUsingProgrammerResponse_ErrStream{
ErrStream: errData,
},
})
}
if outData := resp.GetOutStream(); len(outData) > 0 {
syncSend.Send(&rpc.UploadUsingProgrammerResponse{
Message: &rpc.UploadUsingProgrammerResponse_OutStream{
OutStream: outData,
},
})
}
// resp.GetResult() is ignored
return nil
})
logrus.Tracef("Upload using programmer %s on %s started", req.GetSketchPath(), req.GetFqbn()) logrus.Tracef("Upload using programmer %s on %s started", req.GetSketchPath(), req.GetFqbn())
if req.GetProgrammer() == "" { if req.GetProgrammer() == "" {
return &cmderrors.MissingProgrammerError{} return &cmderrors.MissingProgrammerError{}
} }
_, err := Upload(ctx, &rpc.UploadRequest{ return s.Upload(&rpc.UploadRequest{
Instance: req.GetInstance(), Instance: req.GetInstance(),
SketchPath: req.GetSketchPath(), SketchPath: req.GetSketchPath(),
ImportFile: req.GetImportFile(), ImportFile: req.GetImportFile(),
...@@ -194,8 +256,7 @@ func UploadUsingProgrammer(ctx context.Context, req *rpc.UploadUsingProgrammerRe ...@@ -194,8 +256,7 @@ func UploadUsingProgrammer(ctx context.Context, req *rpc.UploadUsingProgrammerRe
Verbose: req.GetVerbose(), Verbose: req.GetVerbose(),
Verify: req.GetVerify(), Verify: req.GetVerify(),
UserFields: req.GetUserFields(), UserFields: req.GetUserFields(),
}, outStream, errStream) }, streamAdapter)
return err
} }
func runProgramAction(pme *packagemanager.Explorer, func runProgramAction(pme *packagemanager.Explorer,
......
...@@ -24,8 +24,42 @@ import ( ...@@ -24,8 +24,42 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// BurnBootloader FIXMEDOC // BurnBootloaderToServerStreams return a server stream that forwards the output and error streams to the provided io.Writers
func BurnBootloader(ctx context.Context, req *rpc.BurnBootloaderRequest, outStream io.Writer, errStream io.Writer) (*rpc.BurnBootloaderResponse, error) { func BurnBootloaderToServerStreams(ctx context.Context, outStrem, errStream io.Writer) rpc.ArduinoCoreService_BurnBootloaderServer {
stream := streamResponseToCallback(ctx, func(resp *rpc.BurnBootloaderResponse) error {
if outData := resp.GetOutStream(); len(outData) > 0 {
_, err := outStrem.Write(outData)
return err
}
if errData := resp.GetErrStream(); len(errData) > 0 {
_, err := errStream.Write(errData)
return err
}
return nil
})
return stream
}
// BurnBootloader performs the burn bootloader action
func (s *arduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
syncSend := NewSynchronizedSend(stream.Send)
outStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.BurnBootloaderResponse{
Message: &rpc.BurnBootloaderResponse_OutStream{
OutStream: data,
},
})
})
defer outStream.Close()
errStream := feedStreamTo(func(data []byte) {
syncSend.Send(&rpc.BurnBootloaderResponse{
Message: &rpc.BurnBootloaderResponse_ErrStream{
ErrStream: data,
},
})
})
defer errStream.Close()
logrus. logrus.
WithField("fqbn", req.GetFqbn()). WithField("fqbn", req.GetFqbn()).
WithField("port", req.GetPort()). WithField("port", req.GetPort()).
...@@ -34,7 +68,7 @@ func BurnBootloader(ctx context.Context, req *rpc.BurnBootloaderRequest, outStre ...@@ -34,7 +68,7 @@ func BurnBootloader(ctx context.Context, req *rpc.BurnBootloaderRequest, outStre
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance()) pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil { if err != nil {
return nil, err return err
} }
defer release() defer release()
...@@ -54,7 +88,7 @@ func BurnBootloader(ctx context.Context, req *rpc.BurnBootloaderRequest, outStre ...@@ -54,7 +88,7 @@ func BurnBootloader(ctx context.Context, req *rpc.BurnBootloaderRequest, outStre
req.GetDryRun(), req.GetDryRun(),
map[string]string{}, // User fields map[string]string{}, // User fields
); err != nil { ); err != nil {
return nil, err return err
} }
return &rpc.BurnBootloaderResponse{}, nil return syncSend.Send(&rpc.BurnBootloaderResponse{})
} }
...@@ -78,7 +78,8 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) { ...@@ -78,7 +78,8 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) {
} }
stdOut, stdErr, res := feedback.OutputStreams() stdOut, stdErr, res := feedback.OutputStreams()
if _, err := commands.BurnBootloader(context.Background(), &rpc.BurnBootloaderRequest{ stream := commands.BurnBootloaderToServerStreams(ctx, stdOut, stdErr)
if err := srv.BurnBootloader(&rpc.BurnBootloaderRequest{
Instance: instance, Instance: instance,
Fqbn: fqbn.String(), Fqbn: fqbn.String(),
Port: discoveryPort, Port: discoveryPort,
...@@ -86,7 +87,7 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) { ...@@ -86,7 +87,7 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) {
Verify: verify, Verify: verify,
Programmer: programmer.String(instance, srv, fqbn.String()), Programmer: programmer.String(instance, srv, fqbn.String()),
DryRun: dryRun, DryRun: dryRun,
}, stdOut, stdErr); err != nil { }, stream); err != nil {
errcode := feedback.ErrGeneric errcode := feedback.ErrGeneric
if errors.Is(err, &cmderrors.ProgrammerRequiredForUploadError{}) { if errors.Is(err, &cmderrors.ProgrammerRequiredForUploadError{}) {
errcode = feedback.ErrMissingProgrammer errcode = feedback.ErrMissingProgrammer
......
...@@ -292,7 +292,8 @@ func runCompileCommand(cmd *cobra.Command, args []string, srv rpc.ArduinoCoreSer ...@@ -292,7 +292,8 @@ func runCompileCommand(cmd *cobra.Command, args []string, srv rpc.ArduinoCoreSer
UserFields: fields, UserFields: fields,
} }
if res, err := commands.Upload(context.Background(), uploadRequest, stdOut, stdErr); err != nil { stream, streamRes := commands.UploadToServerStreams(ctx, stdOut, stdErr)
if err := srv.Upload(uploadRequest, stream); err != nil {
errcode := feedback.ErrGeneric errcode := feedback.ErrGeneric
if errors.Is(err, &cmderrors.ProgrammerRequiredForUploadError{}) { if errors.Is(err, &cmderrors.ProgrammerRequiredForUploadError{}) {
errcode = feedback.ErrMissingProgrammer errcode = feedback.ErrMissingProgrammer
...@@ -302,7 +303,7 @@ func runCompileCommand(cmd *cobra.Command, args []string, srv rpc.ArduinoCoreSer ...@@ -302,7 +303,7 @@ func runCompileCommand(cmd *cobra.Command, args []string, srv rpc.ArduinoCoreSer
} }
feedback.Fatal(tr("Error during Upload: %v", err), errcode) feedback.Fatal(tr("Error during Upload: %v", err), errcode)
} else { } else {
uploadRes = res uploadRes = streamRes()
} }
} }
......
...@@ -199,7 +199,8 @@ func runUploadCommand(srv rpc.ArduinoCoreServiceServer, args []string, uploadFie ...@@ -199,7 +199,8 @@ func runUploadCommand(srv rpc.ArduinoCoreServiceServer, args []string, uploadFie
DryRun: dryRun, DryRun: dryRun,
UserFields: fields, UserFields: fields,
} }
if res, err := commands.Upload(context.Background(), req, stdOut, stdErr); err != nil { stream, streamResp := commands.UploadToServerStreams(ctx, stdOut, stdErr)
if err := srv.Upload(req, stream); err != nil {
errcode := feedback.ErrGeneric errcode := feedback.ErrGeneric
if errors.Is(err, &cmderrors.ProgrammerRequiredForUploadError{}) { if errors.Is(err, &cmderrors.ProgrammerRequiredForUploadError{}) {
errcode = feedback.ErrMissingProgrammer errcode = feedback.ErrMissingProgrammer
...@@ -213,7 +214,7 @@ func runUploadCommand(srv rpc.ArduinoCoreServiceServer, args []string, uploadFie ...@@ -213,7 +214,7 @@ func runUploadCommand(srv rpc.ArduinoCoreServiceServer, args []string, uploadFie
feedback.PrintResult(&uploadResult{ feedback.PrintResult(&uploadResult{
Stdout: io.Stdout, Stdout: io.Stdout,
Stderr: io.Stderr, Stderr: io.Stderr,
UpdatedUploadPort: result.NewPort(res.GetUpdatedUploadPort()), UpdatedUploadPort: result.NewPort(streamResp().GetUpdatedUploadPort()),
}) })
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment