Skip to content

Commit

Permalink
Simplified gRPC streams helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
cmaglie committed Aug 30, 2022
1 parent 05d1446 commit 2f59e6a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 23 deletions.
34 changes: 29 additions & 5 deletions arduino/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,39 @@
package utils

import (
"context"
"io"
"sync"
"time"

"github.com/djherbis/buffer"
"github.com/djherbis/nio/v3"
)

// implWriteCloser is an helper struct to implement an anonymous io.WriteCloser
type implWriteCloser struct {
write func(buff []byte) (int, error)
close func() error
}

func (w *implWriteCloser) Write(buff []byte) (int, error) {
return w.write(buff)
}

func (w *implWriteCloser) Close() error {
return w.close()
}

// FeedStreamTo creates a pipe to pass data to the writer function.
// FeedStreamTo returns the io.WriteCloser side of the pipe, on which the user can write data.
// The user must call Close() on the returned io.WriteCloser to release all the resources.
// If needed, the context can be used to detect when all the data has been processed after
// closing the writer.
func FeedStreamTo(writer func(data []byte)) (io.WriteCloser, context.Context) {
ctx, cancel := context.WithCancel(context.Background())
func FeedStreamTo(writer func(data []byte)) io.WriteCloser {
r, w := nio.Pipe(buffer.New(32 * 1024))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer cancel()
defer wg.Done()
data := make([]byte, 16384)
for {
if n, err := r.Read(data); err == nil {
Expand All @@ -50,7 +65,16 @@ func FeedStreamTo(writer func(data []byte)) (io.WriteCloser, context.Context) {
}
}
}()
return w, ctx
return &implWriteCloser{
write: w.Write,
close: func() error {
if err := w.Close(); err != nil {
return err
}
wg.Wait()
return nil
},
}
}

// ConsumeStreamFrom creates a pipe to consume data from the reader function.
Expand Down
24 changes: 8 additions & 16 deletions commands/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,16 +259,14 @@ func (s *ArduinoCoreServerImpl) LoadSketch(ctx context.Context, req *rpc.LoadSke

// Compile FIXMEDOC
func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.ArduinoCoreService_CompileServer) error {
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
compileResp, compileErr := compile.Compile(
stream.Context(), req, outStream, errStream,
func(p *rpc.TaskProgress) { stream.Send(&rpc.CompileResponse{Progress: p}) },
false) // Set debug to false
outStream.Close()
errStream.Close()
<-outCtx.Done()
<-errCtx.Done()
var compileRespSendErr error
if compileResp != nil {
compileRespSendErr = stream.Send(compileResp)
Expand Down Expand Up @@ -346,31 +344,27 @@ func (s *ArduinoCoreServerImpl) PlatformList(ctx context.Context, req *rpc.Platf

// Upload FIXMEDOC
func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
resp, err := upload.Upload(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
if err != nil {
return convertErrorToRPCStatus(err)
}
<-outCtx.Done()
<-errCtx.Done()
return stream.Send(resp)
}

// UploadUsingProgrammer FIXMEDOC
func (s *ArduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
resp, err := upload.UsingProgrammer(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
if err != nil {
return convertErrorToRPCStatus(err)
}
<-outCtx.Done()
<-errCtx.Done()
return stream.Send(resp)
}

Expand All @@ -382,16 +376,14 @@ func (s *ArduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rp

// BurnBootloader FIXMEDOC
func (s *ArduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
resp, err := upload.BurnBootloader(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
if err != nil {
return convertErrorToRPCStatus(err)
}
<-outCtx.Done()
<-errCtx.Done()
return stream.Send(resp)
}

Expand Down
3 changes: 1 addition & 2 deletions commands/daemon/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
// Launch debug recipe attaching stdin and out to grpc streaming
signalChan := make(chan os.Signal)
defer close(signalChan)
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
resp, debugErr := cmd.Debug(stream.Context(), req,
utils.ConsumeStreamFrom(func() ([]byte, error) {
command, err := stream.Recv()
Expand All @@ -65,7 +65,6 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
if debugErr != nil {
return debugErr
}
<-outCtx.Done()
return stream.Send(resp)
}

Expand Down

0 comments on commit 2f59e6a

Please sign in to comment.