Skip to content

Commit

Permalink
prefer pf.NewFuncRequest() approach
Browse files Browse the repository at this point in the history
  • Loading branch information
maxlaverse committed Sep 29, 2021
1 parent 44b6e29 commit cbf6639
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 33 deletions.
31 changes: 0 additions & 31 deletions solver/internal/pipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,34 +197,3 @@ func (pr *receiver) Cancel() {
func (pr *receiver) Status() Status {
return pr.status
}

type erroredPipe struct {
req interface{}
err error
}

func NewErroredPipe(req interface{}, err error) Receiver {
return &erroredPipe{
req: req,
err: err,
}
}

func (p *erroredPipe) Request() interface{} {
return p.req
}

func (p *erroredPipe) Receive() bool {
return true
}

func (p *erroredPipe) Status() Status {
return Status{
Canceled: false,
Completed: true,
Err: p.err,
Value: nil,
}
}

func (p *erroredPipe) Cancel() {}
5 changes: 3 additions & 2 deletions solver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package solver

import (
"context"
"fmt"
"os"
"sync"

Expand Down Expand Up @@ -352,7 +351,9 @@ type pipeFactory struct {
func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver {
target := pf.s.ef.getEdge(ee)
if target == nil {
return pipe.NewErroredPipe(req, fmt.Errorf("failed to get edge"))
return pf.NewFuncRequest(func(_ context.Context) (interface{}, error) {
return req, errors.Errorf("failed to get edge")
})
}
p := pf.s.newPipe(target, pf.e, pipe.Request{Payload: req})
if debugScheduler {
Expand Down

0 comments on commit cbf6639

Please sign in to comment.