diff --git a/solver/internal/pipe/pipe.go b/solver/internal/pipe/pipe.go index a1a857f39827c..ab55953936b96 100644 --- a/solver/internal/pipe/pipe.go +++ b/solver/internal/pipe/pipe.go @@ -197,3 +197,34 @@ func (pr *receiver) Cancel() { func (pr *receiver) Status() Status { return pr.status } + +type erroredPipe struct { + req interface{} + err error +} + +func NewErroredPipe(req interface{}, err error) Receiver { + return &erroredPipe{ + req: req, + err: err, + } +} + +func (p *erroredPipe) Request() interface{} { + return p.req +} + +func (p *erroredPipe) Receive() bool { + return true +} + +func (p *erroredPipe) Status() Status { + return Status{ + Canceled: false, + Completed: true, + Err: p.err, + Value: nil, + } +} + +func (p *erroredPipe) Cancel() {} diff --git a/solver/scheduler.go b/solver/scheduler.go index 8de4d694e4b4c..1ceb0374421ff 100644 --- a/solver/scheduler.go +++ b/solver/scheduler.go @@ -2,6 +2,7 @@ package solver import ( "context" + "fmt" "os" "sync" @@ -351,7 +352,7 @@ type pipeFactory struct { func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver { target := pf.s.ef.getEdge(ee) if target == nil { - panic("failed to get edge") // TODO: return errored pipe + return pipe.NewErroredPipe(req, fmt.Errorf("failed to get edge")) } p := pf.s.newPipe(target, pf.e, pipe.Request{Payload: req}) if debugScheduler {