diff --git a/solver/internal/pipe/pipe.go b/solver/internal/pipe/pipe.go index ab55953936b96..a1a857f39827c 100644 --- a/solver/internal/pipe/pipe.go +++ b/solver/internal/pipe/pipe.go @@ -197,34 +197,3 @@ func (pr *receiver) Cancel() { func (pr *receiver) Status() Status { return pr.status } - -type erroredPipe struct { - req interface{} - err error -} - -func NewErroredPipe(req interface{}, err error) Receiver { - return &erroredPipe{ - req: req, - err: err, - } -} - -func (p *erroredPipe) Request() interface{} { - return p.req -} - -func (p *erroredPipe) Receive() bool { - return true -} - -func (p *erroredPipe) Status() Status { - return Status{ - Canceled: false, - Completed: true, - Err: p.err, - Value: nil, - } -} - -func (p *erroredPipe) Cancel() {} diff --git a/solver/scheduler.go b/solver/scheduler.go index 1ceb0374421ff..2ec63dad52806 100644 --- a/solver/scheduler.go +++ b/solver/scheduler.go @@ -2,7 +2,6 @@ package solver import ( "context" - "fmt" "os" "sync" @@ -352,7 +351,9 @@ type pipeFactory struct { func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver { target := pf.s.ef.getEdge(ee) if target == nil { - return pipe.NewErroredPipe(req, fmt.Errorf("failed to get edge")) + return pf.NewFuncRequest(func(_ context.Context) (interface{}, error) { + return req, errors.Errorf("failed to get edge") + }) } p := pf.s.newPipe(target, pf.e, pipe.Request{Payload: req}) if debugScheduler {