diff --git a/pkg/apb/provision_or_update.go b/pkg/apb/provision_or_update.go index a1d014c0c4..b8bbae55dc 100644 --- a/pkg/apb/provision_or_update.go +++ b/pkg/apb/provision_or_update.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" - "github.com/apex/log" "github.com/openshift/ansible-service-broker/pkg/clients" "github.com/openshift/ansible-service-broker/pkg/metrics" "github.com/openshift/ansible-service-broker/pkg/runtime" diff --git a/pkg/apb/types.go b/pkg/apb/types.go index ca487871f3..db6d9acacb 100644 --- a/pkg/apb/types.go +++ b/pkg/apb/types.go @@ -340,7 +340,7 @@ type Deprovisioner func(si *ServiceInstance, statusUpdate chan<- JobState) (stri type Updater func(si *ServiceInstance, statusUpdate chan<- JobState) (string, *ExtractedCredentials, error) // Binder defines a function that knows how to perform a binding -type Binder func(si *ServiceInstance, params *Parameters) (string, *ExtractedCredentials, error) +type Binder func(si *ServiceInstance, params *Parameters, statusUpdate chan<- JobState) (string, *ExtractedCredentials, error) // UnBinder defines a function that knows how to perform a unbinding -type UnBinder func(si *ServiceInstance, params *Parameters) error +type UnBinder func(si *ServiceInstance, params *Parameters, statusUpdate chan<- JobState) error diff --git a/pkg/apb/unbind.go b/pkg/apb/unbind.go index 711337b6e6..16661dacaf 100644 --- a/pkg/apb/unbind.go +++ b/pkg/apb/unbind.go @@ -24,7 +24,7 @@ import ( ) // Unbind - runs the abp with the unbind action. -func Unbind(instance *ServiceInstance, parameters *Parameters) error { +func Unbind(instance *ServiceInstance, parameters *Parameters, stateUpdates chan<- JobState) error { log.Notice("============================================================") log.Notice(" UNBINDING ") log.Notice("============================================================") diff --git a/pkg/broker/binding_job.go b/pkg/broker/binding_job.go index 2a49b6d193..c830708d77 100644 --- a/pkg/broker/binding_job.go +++ b/pkg/broker/binding_job.go @@ -47,7 +47,6 @@ func NewBindingJob(serviceInstance *apb.ServiceInstance, bindingUUID uuid.UUID, // Run - run the binding job. func (p *BindingJob) Run(token string, msgBuffer chan<- JobMsg) { metrics.BindingJobStarted() - defer metrics.BindingJobFinished() jobMsg := JobMsg{ InstanceUUID: p.serviceInstance.ID.String(), JobToken: token, @@ -59,12 +58,36 @@ func (p *BindingJob) Run(token string, msgBuffer chan<- JobMsg) { Token: token, }, } - log.Debug("bindjob: binding job started, calling apb.Bind") + // send starting state + msgBuffer <- jobMsg + stateUpdates := make(chan apb.JobState) + // variables set by the bind action + var ( + err error + podName string + extCreds *apb.ExtractedCredentials + ) + // run the bind async and block on reading the status updates + go func() { + defer func() { + metrics.BindingJobFinished() + close(stateUpdates) + }() + log.Debug("bindjob: binding job started, calling apb.Bind") + + podName, extCreds, err = p.bind(p.serviceInstance, p.params, stateUpdates) - podName, extCreds, err := p.bind(p.serviceInstance, p.params) + log.Debug("bindjob: returned from apb.Bind") + }() - log.Debug("bindjob: returned from apb.Bind") + //read our status updates and send on updated JobMsgs for the subscriber to persist + for su := range stateUpdates { + su.Token = token + su.Method = apb.JobMethodDeprovision + msgBuffer <- JobMsg{InstanceUUID: p.serviceInstance.ID.String(), JobToken: token, State: su, PodName: su.Podname} + } + // status channel closed our job is complete lets check the err if err != nil { log.Errorf("bindjob::Binding error occurred.\n%s", err.Error()) jobMsg.State.State = apb.StateFailed @@ -73,7 +96,6 @@ func (p *BindingJob) Run(token string, msgBuffer chan<- JobMsg) { if err == apb.ErrorPodPullErr { errMsg = err.Error() } - // send error message // can't have an error type in a struct you want marshalled // https://github.com/golang/go/issues/5161 @@ -82,7 +104,7 @@ func (p *BindingJob) Run(token string, msgBuffer chan<- JobMsg) { return } - // send creds + // send creds if available and state success log.Debug("bindjob: looks like we're done, sending credentials") if nil != extCreds { jobMsg.ExtractedCredentials = *extCreds diff --git a/pkg/broker/binding_job_test.go b/pkg/broker/binding_job_test.go index 16a54cb27a..30cdca3d9c 100644 --- a/pkg/broker/binding_job_test.go +++ b/pkg/broker/binding_job_test.go @@ -25,27 +25,25 @@ func TestBindingJob_Run(t *testing.T) { Name string Binder apb.Binder BindParams *apb.Parameters - Validate func(msg broker.JobMsg) error + Validate func(msg []broker.JobMsg) error }{ { Name: "expect a success msg with extracted credentials", - Binder: func(si *apb.ServiceInstance, params *apb.Parameters) (string, *apb.ExtractedCredentials, error) { + Binder: func(si *apb.ServiceInstance, params *apb.Parameters, status chan<- apb.JobState) (string, *apb.ExtractedCredentials, error) { return "podName", &apb.ExtractedCredentials{Credentials: map[string]interface{}{ "user": "test", "pass": "test", }}, nil }, - Validate: func(msg broker.JobMsg) error { - if msg.State.State != apb.StateSucceeded { - return fmt.Errorf("expected the state to be %v but got %v", apb.StateSucceeded, msg.State.State) + Validate: func(msgs []broker.JobMsg) error { + if err := commonJobMsgValidation(apb.StateSucceeded, apb.JobMethodBind, msgs); err != nil { + return err } - if msg.State.Method != apb.JobMethodBind { - return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodBind, msg.State.Method) - } - if msg.PodName == "" { + lastMsg := msgs[len(msgs)-1] + if lastMsg.PodName == "" { return fmt.Errorf("expected the podName to be set but it was empty") } - credentials := msg.ExtractedCredentials.Credentials + credentials := lastMsg.ExtractedCredentials.Credentials if _, ok := credentials["user"]; !ok { return fmt.Errorf("expected a user key in the credentials but it was missing") @@ -58,42 +56,38 @@ func TestBindingJob_Run(t *testing.T) { }, { Name: "expect failure state and generic error when unknown error type", - Binder: func(si *apb.ServiceInstance, params *apb.Parameters) (string, *apb.ExtractedCredentials, error) { + Binder: func(si *apb.ServiceInstance, params *apb.Parameters, status chan<- apb.JobState) (string, *apb.ExtractedCredentials, error) { return "", nil, fmt.Errorf("should not see") }, - Validate: func(msg broker.JobMsg) error { - if msg.State.State != apb.StateFailed { - return fmt.Errorf("expected the Job to be in state %v but was in %v ", apb.StateFailed, msg.State.State) - } - if msg.State.Method != apb.JobMethodBind { - return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodBind, msg.State.Method) + Validate: func(msgs []broker.JobMsg) error { + if err := commonJobMsgValidation(apb.StateFailed, apb.JobMethodBind, msgs); err != nil { + return err } - if msg.State.Error == "" { + lastMsg := msgs[len(msgs)-1] + if lastMsg.State.Error == "" { return fmt.Errorf("expected an error in the job state but got none") } - if msg.State.Error == "should not see" { - return fmt.Errorf("expected not to see the error msg %s it should have been replaced with a generic error ", msg.State.Error) + if lastMsg.State.Error == "should not see" { + return fmt.Errorf("expected not to see the error msg %s it should have been replaced with a generic error ", lastMsg.State.Error) } return nil }, }, { Name: "expect failure state and full error when known error type", - Binder: func(si *apb.ServiceInstance, params *apb.Parameters) (string, *apb.ExtractedCredentials, error) { + Binder: func(si *apb.ServiceInstance, params *apb.Parameters, status chan<- apb.JobState) (string, *apb.ExtractedCredentials, error) { return "", nil, apb.ErrorPodPullErr }, - Validate: func(msg broker.JobMsg) error { - if msg.State.State != apb.StateFailed { - return fmt.Errorf("expected the Job to be in state %v but was in %v ", apb.StateFailed, msg.State.State) - } - if msg.State.Method != apb.JobMethodBind { - return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodBind, msg.State.Method) + Validate: func(msgs []broker.JobMsg) error { + if err := commonJobMsgValidation(apb.StateFailed, apb.JobMethodBind, msgs); err != nil { + return err } - if msg.State.Error == "" { + lastMsg := msgs[len(msgs)-1] + if lastMsg.State.Error == "" { return fmt.Errorf("expected an error in the job state but got none") } - if msg.State.Error != apb.ErrorPodPullErr.Error() { - return fmt.Errorf("expected to see the error msg %s but got %s ", apb.ErrorPodPullErr, msg.State.Error) + if lastMsg.State.Error != apb.ErrorPodPullErr.Error() { + return fmt.Errorf("expected to see the error msg %s but got %s ", apb.ErrorPodPullErr, lastMsg.State.Error) } return nil }, @@ -104,18 +98,16 @@ func TestBindingJob_Run(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { bindingJob := broker.NewBindingJob(serviceInstance, bindingID, tc.BindParams, tc.Binder) receiver := make(chan broker.JobMsg) - timedOut := false time.AfterFunc(1*time.Second, func() { close(receiver) - timedOut = true }) go bindingJob.Run("", receiver) - msg := <-receiver - if timedOut { - t.Fatal("timed out waiting for a msg from the Job") + var msgs []broker.JobMsg + for msg := range receiver { + msgs = append(msgs, msg) } - if err := tc.Validate(msg); err != nil { + if err := tc.Validate(msgs); err != nil { t.Fatal("failed to validate the jobmsg ", err) } diff --git a/pkg/broker/broker.go b/pkg/broker/broker.go index cd63987680..0a50984e4d 100644 --- a/pkg/broker/broker.go +++ b/pkg/broker/broker.go @@ -642,14 +642,6 @@ func (a AnsibleBroker) Provision(instanceUUID uuid.UUID, req *ProvisionRequest, log.Error("Failed to start new job for async provision\n%s", err.Error()) return nil, err } - - // HACK: there might be a delay between the first time the state in etcd - // is set and the job was already started. But I need the token. - a.dao.SetState(instanceUUID.String(), apb.JobState{ - Token: token, - State: apb.StateInProgress, - Method: apb.JobMethodProvision, - }) } else { log.Info("reverting to synchronous provisioning in progress") if err := a.engine.StartNewSyncJob(token, pjob, ProvisionTopic); err != nil { @@ -710,14 +702,6 @@ func (a AnsibleBroker) Deprovision( log.Error("Failed to start new job for async deprovision\n%s", err.Error()) return nil, err } - - // HACK: there might be a delay between the first time the state in etcd - // is set and the job was already started. But I need the token. - a.dao.SetState(instance.ID.String(), apb.JobState{ - Token: token, - State: apb.StateInProgress, - Method: apb.JobMethodDeprovision, - }) return &DeprovisionResponse{Operation: token}, nil } @@ -727,11 +711,6 @@ func (a AnsibleBroker) Deprovision( return nil, err } } - - err = cleanupDeprovision(instance.ID.String(), a.dao) - if err != nil { - return nil, err - } return &DeprovisionResponse{}, nil } @@ -900,48 +879,34 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID, // 'LaunchApbOnBind' of the broker config, due to lack of async support of // bind in Open Service Broker API Currently, the 'launchapbonbind' is set // to false in the 'config' ConfigMap - var bindExtCreds *apb.ExtractedCredentials + metrics.ActionStarted("bind") - var token string + var ( + bindExtCreds *apb.ExtractedCredentials + token = a.engine.Token() + bindingJob = NewBindingJob(&instance, bindingUUID, ¶ms, apb.Bind) + ) if async && a.brokerConfig.LaunchApbOnBind { // asynchronous mode, requires that the launch apb config // entry is on, and that async comes in from the catalog log.Info("ASYNC binding in progress") - bindjob := NewBindingJob(&instance, bindingUUID, ¶ms, apb.Bind) - - token, err = a.engine.StartNewJob("", bindjob, BindingTopic) + token, err = a.engine.StartNewAsyncJob("", bindingJob, BindingTopic) if err != nil { log.Error("Failed to start new job for async binding\n%s", err.Error()) return nil, err } - - if err := a.dao.SetState(instance.ID.String(), apb.JobState{ - Token: token, - State: apb.StateInProgress, - Method: apb.JobMethodBind, - }); err != nil { - log.Errorf("failed to set initial jobstate for %v, %v", token, err.Error()) - } } else if a.brokerConfig.LaunchApbOnBind { // we are synchronous mode log.Info("Broker configured to run APB bind") - _, bindExtCreds, err = apb.Bind(&instance, ¶ms, nil) - - if err != nil { + if err := a.engine.StartNewSyncJob(token, bindingJob, BindingTopic); err != nil { return nil, err } + //TODO are we only setting the bindingUUID if sync? instance.AddBinding(bindingUUID) if err := a.dao.SetServiceInstance(instance.ID.String(), &instance); err != nil { return nil, err } - if bindExtCreds != nil { - err = a.dao.SetExtractedCredentials(bindingUUID.String(), bindExtCreds) - if err != nil { - log.Errorf("Could not persist extracted credentials - %v", err) - return nil, err - } - } } else { log.Warning("Broker configured to *NOT* launch and run APB bind") } @@ -1022,27 +987,21 @@ func (a AnsibleBroker) Unbind( } metrics.ActionStarted("unbind") - var token string - var jerr error + var ( + token = a.engine.Token() + jerr error + unbindJob = NewUnbindingJob(&serviceInstance, &bindInstance, ¶ms, apb.Unbind, skipApbExecution) + ) if async && a.brokerConfig.LaunchApbOnBind { // asynchronous mode, required that the launch apb config // entry is on, and that async comes in from the catalog log.Info("ASYNC unbinding in progress") - unbindjob := NewUnbindingJob(&serviceInstance, &bindInstance, ¶ms, apb.Unbind, skipApbExecution) - token, jerr = a.engine.StartNewJob("", unbindjob, UnbindingTopic) + + token, jerr = a.engine.StartNewAsyncJob("", unbindJob, UnbindingTopic) if jerr != nil { log.Error("Failed to start new job for async unbind\n%s", jerr.Error()) return nil, jerr } - - if err := a.dao.SetState(serviceInstance.ID.String(), apb.JobState{ - Token: token, - State: apb.StateInProgress, - Method: apb.JobMethodUnbind, - }); err != nil { - log.Errorf("failed to set initial jobstate for %v, %v", token, err.Error()) - } - } else if a.brokerConfig.LaunchApbOnBind { // only launch apb if we are always launching the APB. if skipApbExecution { @@ -1050,7 +1009,9 @@ func (a AnsibleBroker) Unbind( err = nil } else { log.Debug("Launching apb for unbind in blocking mode") - err = apb.Unbind(&serviceInstance, ¶ms, nil) + if err := a.engine.StartNewSyncJob(token, unbindJob, UnbindingTopic); err != nil { + return nil, err + } } if err != nil { return nil, err @@ -1059,6 +1020,7 @@ func (a AnsibleBroker) Unbind( log.Warning("Broker configured to *NOT* launch and run APB unbind") } + //TODO should these not be handled in the subscriber if the subscriber is handling state if bindExtCreds != nil { err = a.dao.DeleteExtractedCredentials(bindInstance.ID.String()) if err != nil { @@ -1272,9 +1234,6 @@ func (a AnsibleBroker) Update(instanceUUID uuid.UUID, req *UpdateRequest, async log.Error("Failed to start new job for async update\n%s", err.Error()) return nil, err } - // HACK: there might be a delay between the first time the state in etcd - // is set and the job was already started. But I need the token. - a.dao.SetState(instanceUUID.String(), apb.JobState{Token: token, State: apb.StateInProgress, Method: apb.JobMethodUpdate}) } else { log.Info("reverting to synchronous update in progress") if err := a.engine.StartNewSyncJob(token, ujob, UpdateTopic); err != nil { diff --git a/pkg/broker/deprovision_job.go b/pkg/broker/deprovision_job.go index 395dfdc01c..2154f5e1fd 100644 --- a/pkg/broker/deprovision_job.go +++ b/pkg/broker/deprovision_job.go @@ -42,8 +42,6 @@ func NewDeprovisionJob(serviceInstance *apb.ServiceInstance, // Run - will run the deprovision job. func (p *DeprovisionJob) Run(token string, msgBuffer chan<- JobMsg) { metrics.DeprovisionJobStarted() - defer metrics.DeprovisionJobFinished() - stateUpdates := make(chan apb.JobState) jobMsg := JobMsg{ InstanceUUID: p.serviceInstance.ID.String(), JobToken: token, @@ -54,6 +52,7 @@ func (p *DeprovisionJob) Run(token string, msgBuffer chan<- JobMsg) { Token: token, }, } + stateUpdates := make(chan apb.JobState) var ( errMsg = "Error occurred during deprovision. Please contact administrator if it persists." @@ -61,7 +60,6 @@ func (p *DeprovisionJob) Run(token string, msgBuffer chan<- JobMsg) { jobErr error ) - metrics.DeprovisionJobStarted() go func() { defer func() { close(stateUpdates) diff --git a/pkg/broker/deprovision_subscriber.go b/pkg/broker/deprovision_subscriber.go index 05c8f47383..db6c3b5ca9 100644 --- a/pkg/broker/deprovision_subscriber.go +++ b/pkg/broker/deprovision_subscriber.go @@ -17,7 +17,6 @@ package broker import ( - "github.com/apex/log" "github.com/openshift/ansible-service-broker/pkg/apb" ) diff --git a/pkg/broker/unbinding_job.go b/pkg/broker/unbinding_job.go index 82d4618176..5c2aacbd43 100644 --- a/pkg/broker/unbinding_job.go +++ b/pkg/broker/unbinding_job.go @@ -48,7 +48,6 @@ func NewUnbindingJob(serviceInstance *apb.ServiceInstance, bindInstance *apb.Bin // Run - run the binding job. func (p *UnbindingJob) Run(token string, msgBuffer chan<- JobMsg) { metrics.UnbindingJobStarted() - defer metrics.UnbindingJobFinished() jobMsg := JobMsg{ InstanceUUID: p.serviceInstance.ID.String(), JobToken: token, @@ -60,6 +59,9 @@ func (p *UnbindingJob) Run(token string, msgBuffer chan<- JobMsg) { Token: token, }, } + // set initial state + msgBuffer <- jobMsg + stateUpdates := make(chan apb.JobState) log.Debugf("unbindjob: unbinding job (%v) started, calling apb.Unbind", token) @@ -70,11 +72,23 @@ func (p *UnbindingJob) Run(token string, msgBuffer chan<- JobMsg) { msgBuffer <- jobMsg return } + var err error - err := p.unbind(p.serviceInstance, p.params) - - log.Debug("unbindjob: returned from apb.Unbind") - + go func() { + defer func() { + metrics.UnbindingJobFinished() + close(stateUpdates) + }() + err = p.unbind(p.serviceInstance, p.params, stateUpdates) + log.Debug("unbindjob: returned from apb.Unbind") + }() + //read our status updates and send on updated JobMsgs for the subscriber to persist + for su := range stateUpdates { + su.Token = token + su.Method = apb.JobMethodDeprovision + msgBuffer <- JobMsg{InstanceUUID: p.serviceInstance.ID.String(), JobToken: token, State: su, PodName: su.Podname} + } + // status channel has closed so our Job has ended check for any err if err != nil { errMsg := "Error occurred during binding. Please contact administrator if it persists." log.Errorf("unbindjob::Unbinding error occurred.\n%s", err.Error()) diff --git a/pkg/broker/unbinding_job_test.go b/pkg/broker/unbinding_job_test.go index a8bf4f1346..0667b61cf3 100644 --- a/pkg/broker/unbinding_job_test.go +++ b/pkg/broker/unbinding_job_test.go @@ -26,77 +26,61 @@ func TestUnBindingJob_Run(t *testing.T) { UnBinder apb.UnBinder UnBindParams *apb.Parameters SkipExecution bool - Validate func(msg broker.JobMsg) error + Validate func(msgs []broker.JobMsg) error }{ { Name: "expect a success msg", - UnBinder: func(si *apb.ServiceInstance, params *apb.Parameters) error { + UnBinder: func(si *apb.ServiceInstance, params *apb.Parameters, state chan<- apb.JobState) error { return nil }, - Validate: func(msg broker.JobMsg) error { - if msg.State.State != apb.StateSucceeded { - return fmt.Errorf("expected the state to be %v but got %v", apb.StateSucceeded, msg.State.State) - } - if msg.State.Method != apb.JobMethodUnbind { - return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodUnbind, msg.State.Method) - } - return nil + Validate: func(msgs []broker.JobMsg) error { + return commonJobMsgValidation(apb.StateSucceeded, apb.JobMethodUnbind, msgs) }, }, { Name: "expect a success msg when skipping apb execution", SkipExecution: true, - UnBinder: func(si *apb.ServiceInstance, params *apb.Parameters) error { + UnBinder: func(si *apb.ServiceInstance, params *apb.Parameters, state chan<- apb.JobState) error { return nil }, - Validate: func(msg broker.JobMsg) error { - if msg.State.State != apb.StateSucceeded { - return fmt.Errorf("expected the state to be %v but got %v", apb.StateSucceeded, msg.State.State) - } - if msg.State.Method != apb.JobMethodUnbind { - return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodUnbind, msg.State.Method) - } - return nil + Validate: func(msgs []broker.JobMsg) error { + return commonJobMsgValidation(apb.StateSucceeded, apb.JobMethodUnbind, msgs) }, }, { Name: "expect failure state and generic error when unknown error type", - UnBinder: func(si *apb.ServiceInstance, params *apb.Parameters) error { + UnBinder: func(si *apb.ServiceInstance, params *apb.Parameters, state chan<- apb.JobState) error { return fmt.Errorf("should not see") }, - Validate: func(msg broker.JobMsg) error { - if msg.State.State != apb.StateFailed { - return fmt.Errorf("expected the Job to be in state %v but was in %v ", apb.StateFailed, msg.State.State) - } - if msg.State.Method != apb.JobMethodUnbind { - return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodUnbind, msg.State.Method) + Validate: func(msgs []broker.JobMsg) error { + if err := commonJobMsgValidation(apb.StateFailed, apb.JobMethodUnbind, msgs); err != nil { + return err } - if msg.State.Error == "" { + lastMsg := msgs[len(msgs)-1] + if lastMsg.State.Error == "" { return fmt.Errorf("expected an error in the job state but got none") } - if msg.State.Error == "should not see" { - return fmt.Errorf("expected not to see the error msg %s it should have been replaced with a generic error ", msg.State.Error) + if lastMsg.State.Error == "should not see" { + return fmt.Errorf("expected not to see the error msg %s it should have been replaced with a generic error ", lastMsg.State.Error) } return nil }, }, { Name: "expect failure state and full error when known error type", - UnBinder: func(si *apb.ServiceInstance, params *apb.Parameters) error { + UnBinder: func(si *apb.ServiceInstance, params *apb.Parameters, state chan<- apb.JobState) error { return apb.ErrorPodPullErr }, - Validate: func(msg broker.JobMsg) error { - if msg.State.State != apb.StateFailed { - return fmt.Errorf("expected the Job to be in state %v but was in %v ", apb.StateFailed, msg.State.State) - } - if msg.State.Method != apb.JobMethodUnbind { - return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodUnbind, msg.State.Method) + Validate: func(msgs []broker.JobMsg) error { + if err := commonJobMsgValidation(apb.StateFailed, apb.JobMethodUnbind, msgs); err != nil { + return err } - if msg.State.Error == "" { + lastMsg := msgs[len(msgs)-1] + if lastMsg.State.Error == "" { return fmt.Errorf("expected an error in the job state but got none") } - if msg.State.Error != apb.ErrorPodPullErr.Error() { - return fmt.Errorf("expected to see the error msg %s but got %s ", apb.ErrorPodPullErr, msg.State.Error) + if lastMsg.State.Error != apb.ErrorPodPullErr.Error() { + return fmt.Errorf("expected to see the error msg %s but got %s ", apb.ErrorPodPullErr, lastMsg.State.Error) } return nil }, @@ -107,18 +91,16 @@ func TestUnBindingJob_Run(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { unbindJob := broker.NewUnbindingJob(serviceInstance, bindingInst, tc.UnBindParams, tc.UnBinder, tc.SkipExecution) receiver := make(chan broker.JobMsg) - timedOut := false + time.AfterFunc(1*time.Second, func() { close(receiver) - timedOut = true }) go unbindJob.Run("", receiver) - - msg := <-receiver - if timedOut { - t.Fatal("timed out waiting for a msg from the Job") + var msgs []broker.JobMsg + for msg := range receiver { + msgs = append(msgs, msg) } - if err := tc.Validate(msg); err != nil { + if err := tc.Validate(msgs); err != nil { t.Fatal("failed to validate the jobmsg ", err) } diff --git a/pkg/broker/update_subscriber.go b/pkg/broker/update_subscriber.go index c832bc24ff..3e4e59e018 100644 --- a/pkg/broker/update_subscriber.go +++ b/pkg/broker/update_subscriber.go @@ -17,7 +17,6 @@ package broker import ( - "github.com/apex/log" "github.com/openshift/ansible-service-broker/pkg/apb" )