Skip to content

Commit

Permalink
Moved FeedStreamTo and ConsumeStreamFrom into deamon package and mde …
Browse files Browse the repository at this point in the history
…them private
  • Loading branch information
cmaglie committed Aug 31, 2022
1 parent 2f59e6a commit 405a810
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 19 deletions.
17 changes: 8 additions & 9 deletions commands/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"

"github.com/arduino/arduino-cli/arduino"
"github.com/arduino/arduino-cli/arduino/utils"
"github.com/arduino/arduino-cli/commands"
"github.com/arduino/arduino-cli/commands/board"
"github.com/arduino/arduino-cli/commands/compile"
Expand Down Expand Up @@ -259,8 +258,8 @@ func (s *ArduinoCoreServerImpl) LoadSketch(ctx context.Context, req *rpc.LoadSke

// Compile FIXMEDOC
func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.ArduinoCoreService_CompileServer) error {
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
compileResp, compileErr := compile.Compile(
stream.Context(), req, outStream, errStream,
func(p *rpc.TaskProgress) { stream.Send(&rpc.CompileResponse{Progress: p}) },
Expand Down Expand Up @@ -344,8 +343,8 @@ func (s *ArduinoCoreServerImpl) PlatformList(ctx context.Context, req *rpc.Platf

// Upload FIXMEDOC
func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
resp, err := upload.Upload(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
Expand All @@ -357,8 +356,8 @@ func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.Arduin

// UploadUsingProgrammer FIXMEDOC
func (s *ArduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
resp, err := upload.UsingProgrammer(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
Expand All @@ -376,8 +375,8 @@ func (s *ArduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rp

// BurnBootloader FIXMEDOC
func (s *ArduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
resp, err := upload.BurnBootloader(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
Expand Down
5 changes: 2 additions & 3 deletions commands/daemon/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"os"

"github.com/arduino/arduino-cli/arduino/utils"
cmd "github.com/arduino/arduino-cli/commands/debug"
dbg "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/debug/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -50,9 +49,9 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
// Launch debug recipe attaching stdin and out to grpc streaming
signalChan := make(chan os.Signal)
defer close(signalChan)
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
outStream := feedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
resp, debugErr := cmd.Debug(stream.Context(), req,
utils.ConsumeStreamFrom(func() ([]byte, error) {
consumeStreamFrom(func() ([]byte, error) {
command, err := stream.Recv()
if command.GetSendInterrupt() {
signalChan <- os.Interrupt
Expand Down
14 changes: 7 additions & 7 deletions arduino/utils/stream.go → commands/daemon/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to [email protected].

package utils
package daemon

import (
"io"
Expand All @@ -38,12 +38,12 @@ func (w *implWriteCloser) Close() error {
return w.close()
}

// FeedStreamTo creates a pipe to pass data to the writer function.
// FeedStreamTo returns the io.WriteCloser side of the pipe, on which the user can write data.
// feedStreamTo creates a pipe to pass data to the writer function.
// feedStreamTo returns the io.WriteCloser side of the pipe, on which the user can write data.
// The user must call Close() on the returned io.WriteCloser to release all the resources.
// If needed, the context can be used to detect when all the data has been processed after
// closing the writer.
func FeedStreamTo(writer func(data []byte)) io.WriteCloser {
func feedStreamTo(writer func(data []byte)) io.WriteCloser {
r, w := nio.Pipe(buffer.New(32 * 1024))
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -77,9 +77,9 @@ func FeedStreamTo(writer func(data []byte)) io.WriteCloser {
}
}

// ConsumeStreamFrom creates a pipe to consume data from the reader function.
// ConsumeStreamFrom returns the io.Reader side of the pipe, which the user can use to consume the data
func ConsumeStreamFrom(reader func() ([]byte, error)) io.Reader {
// consumeStreamFrom creates a pipe to consume data from the reader function.
// consumeStreamFrom returns the io.Reader side of the pipe, which the user can use to consume the data
func consumeStreamFrom(reader func() ([]byte, error)) io.Reader {
r, w := io.Pipe()
go func() {
for {
Expand Down
6 changes: 6 additions & 0 deletions docs/UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ must be changed to:
eventsChan, closeWatcher, err := board.Watch(req)
```

### Removed `utils.FeedStreamTo` and `utils.ConsumeStreamFrom`

`github.com/arduino/arduino-cli/arduino/utils.FeedStreamTo` and
`github.com/arduino/arduino-cli/arduino/utils.ConsumeStreamFrom` are now private. They are mainly used internally for
gRPC stream handling and are not suitable to be public API.

## 0.26.0

### `github.com/arduino/arduino-cli/commands.DownloadToolRelease`, and `InstallToolRelease` functions have been removed
Expand Down

0 comments on commit 405a810

Please sign in to comment.